This commit is contained in:
Ryan Williams
2010-01-14 15:32:37 -08:00
43 changed files with 2559 additions and 986 deletions

View File

@@ -1,40 +1,71 @@
Basic Usage
===========
Most of the APIs required for basic eventlet usage are exported by the eventlet.api module.
Eventlet is built around the concept of green threads (i.e. coroutines) that are launched to do network-related work. Green threads differ from normal threads in two main ways:
* Green threads are so cheap they are nearly free. You do not have to conserve green threads like you would normal threads. In general, there will be at least one green thread per network connection. Switching between them is quite efficient.
* Green threads cooperatively yield to each other instead of preemptively being scheduled. The major advantage from this behavior is that shared data structures don't need locks, because only if a yield is explicitly called can another green thread have access to the data structure. It is also possible to inspect communication primitives such as queues to see if they have any data or waiting green threads, something that is not possible with preemptive threads.
Here are some basic functions that manipulate coroutines.
There are a bunch of basic patterns that Eventlet usage falls into. One is the client pattern, which makes a bunch of requests to servers and processes the responses. Another is the server pattern, where an application holds open a socket and processes requests that are incoming on it. These two patterns involve somewhat different usage of Eventlet's primitives, so here are a few examples to show them off.
.. automethod:: eventlet.api::spawn
Client-side pattern
--------------------
.. automethod:: eventlet.api::sleep
.. automethod:: eventlet.api::call_after
.. automethod:: eventlet.api::exc_after
Socket Functions
-----------------
.. |socket| replace:: ``socket.socket``
.. _socket: http://docs.python.org/library/socket.html#socket-objects
.. |select| replace:: ``select.select``
.. _select: http://docs.python.org/library/select.html
The canonical client-side example is a web crawler. This use case is given a list of urls and wants to retrieve their bodies for later processing. Here is a very simple example::
Eventlet provides convenience functions that return green sockets. The green
socket objects have the same interface as the standard library |socket|_
object, except they will automatically cooperatively yield control to other
eligible coroutines instead of blocking. Eventlet also has the ability to
monkey patch the standard library |socket|_ object so that code which uses
it will also automatically cooperatively yield; see
:ref:`using_standard_library_with_eventlet`.
urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
.. automethod:: eventlet.api::tcp_listener
import eventlet
from eventlet.green import urllib2
.. automethod:: eventlet.api::connect_tcp
def fetch(url):
return urllib2.urlopen(url).read()
pool = eventlet.GreenPool(200)
for body in pool.imap(fetch, urls):
print "got body", len(body)
There is a slightly more complex version of this in the file ``examples/webcrawler.py`` in the source distribution. Here's a tour of the interesting lines in this crawler.
``from eventlet.green import urllib2`` is how you import a cooperatively-yielding version of urllib2. It is the same in all respects to the standard version, except that it uses green sockets for its communication.
``pool = eventlet.GreenPool(200)`` constructs a pool of 200 green threads. Using a pool is good practice because it provides an upper limit on the amount of work that this crawler will be doing simultaneously, which comes in handy when the input data changes dramatically.
``for body in pool.imap(fetch, urls):`` iterates over the results of calling the fetch function in parallel. :meth:`imap <eventlet.parallel.GreenPool.imap>` makes the function calls in parallel, and the results are returned in the order that they were executed.
Server-side pattern
--------------------
Here's a simple server-side example, a simple echo server::
import eventlet
from eventlet.green import socket
def handle(client):
while True:
c = client.recv(1)
if not c: break
client.sendall(c)
server = socket.socket()
server.bind(('0.0.0.0', 6000))
server.listen(50)
pool = eventlet.GreenPool(10000)
while True:
new_sock, address = server.accept()
pool.spawn_n(handle, new_sock)
The file ``examples/echoserver.py`` contains a somewhat more robust and complex version of this example.
``from eventlet.green import socket`` imports eventlet's socket module, which is just like the regular socket module, but cooperatively yielding.
``pool = eventlet.GreenPool(10000)`` creates a pool of green threads that could handle ten thousand clients.
``pool.spawn_n(handle, new_sock)`` launches a green thread to handle the new client. The accept loop doesn't care about the return value of the handle function, so it uses :meth:`spawn_n <eventlet.parallel.GreenPool.spawn_n>`, instead of :meth:`spawn <eventlet.parallel.GreenPool.spawn>`. This is a little bit more efficient.
.. automethod:: eventlet.api::ssl_listener
.. _using_standard_library_with_eventlet:
@@ -46,8 +77,8 @@ Using the Standard Library with Eventlet
Eventlet's socket object, whose implementation can be found in the
:mod:`eventlet.greenio` module, is designed to match the interface of the
standard library |socket|_ object. However, it is often useful to be able to
use existing code which uses |socket|_ directly without modifying it to use the
standard library :mod:`socket` object. However, it is often useful to be able to
use existing code which uses :mod:`socket` directly without modifying it to use the
eventlet apis. To do this, one must call
:func:`~eventlet.util.wrap_socket_with_coroutine_socket`. It is only necessary
to do this once, at the beginning of the program, and it should be done before
@@ -58,7 +89,7 @@ whether this is a good or a bad idea, please let us know.
.. automethod:: eventlet.util::wrap_select_with_coroutine_select
Some code which is written in a multithreaded style may perform some tricks,
such as calling |select|_ with only one file descriptor and a timeout to
such as calling :mod:`select` with only one file descriptor and a timeout to
prevent the operation from being unbounded. For this specific situation there
is :func:`~eventlet.util.wrap_select_with_coroutine_select`; however it's
always a good idea when trying any new library with eventlet to perform some

View File

@@ -3,9 +3,10 @@ Chat Server Example
Let's look at a simple example, a chat server::
from eventlet import api
import eventlet
from eventlet.green import socket
participants = [ ]
participants = []
def read_chat_forever(writer, reader):
line = reader.readline()
@@ -14,22 +15,27 @@ Let's look at a simple example, a chat server::
for p in participants:
if p is not writer: # Don't echo
p.write(line)
p.flush()
line = reader.readline()
participants.remove(writer)
print "Participant left chat."
try:
print "ChatServer starting up on port 3000"
server = api.tcp_listener(('0.0.0.0', 3000))
server = socket.socket()
server.bind(('0.0.0.0', 3000))
server.listen(50)
while True:
new_connection, address = server.accept()
print "Participant joined chat."
new_writer = new_connection.makefile('w')
participants.append(new_writer)
api.spawn(read_chat_forever, new_writer, new_connection.makefile('r'))
except KeyboardInterrupt:
eventlet.spawn_n(read_chat_forever,
new_writer,
new_connection.makefile('r'))
except (KeyboardInterrupt, SystemExit):
print "ChatServer exiting."
The server shown here is very easy to understand. If it was written using Python's threading module instead of eventlet, the control flow and code layout would be exactly the same. The call to :func:`~eventlet.api.tcp_listener` would be replaced with the appropriate calls to Python's built-in ``socket`` module, and the call to :func:`~eventlet.api.spawn` would be replaced with the appropriate call to the ``thread`` module. However, if implemented using the ``thread`` module, each new connection would require the operating system to allocate another 8 MB stack, meaning this simple program would consume all of the RAM on a machine with 1 GB of memory with only 128 users connected, without even taking into account memory used by any objects on the heap! Using eventlet, this simple program can accommodate thousands and thousands of simultaneous users, consuming very little RAM and very little CPU.
The server shown here is very easy to understand. If it was written using Python's threading module instead of eventlet, the control flow and code layout would be exactly the same. The call to :func:`~eventlet.spawn` would be replaced with the appropriate call to the :mod:`threading` module. Using Eventlet, this simple program can accommodate thousands and thousands of simultaneous users, consuming very little RAM and very little CPU.
What sort of servers would require concurrency like this? A typical Web server might measure traffic on the order of 10 requests per second; at any given moment, the server might only have a handful of HTTP connections open simultaneously. However, a chat server, instant messenger server, or multiplayer game server will need to maintain one connection per online user to be able to send messages to them as other users chat or make moves in the game. Also, as advanced Web development techniques such as Ajax, Ajax polling, and Comet (the "Long Poll") become more popular, Web servers will need to be able to deal with many more simultaneous requests. In fact, since the Comet technique involves the client making a new request as soon as the server closes an old one, a Web server servicing Comet clients has the same characteristics as a chat or game server: one connection per online user.

View File

@@ -3,38 +3,26 @@ Eventlet
Eventlet is a networking library written in Python. It achieves high scalability by using `non-blocking io <http://en.wikipedia.org/wiki/Asynchronous_I/O#Select.28.2Fpoll.29_loops>`_ while at the same time retaining high programmer usability by using `coroutines <http://en.wikipedia.org/wiki/Coroutine>`_ to make the non-blocking io operations appear blocking at the source code level.
Eventlet is different from all the other event-based frameworks out there because it doesn't require you to restructure your code to use it. You don't have to rewrite your code to use callbacks, and you don't have to replace your main() method with some sort of dispatch method. You can just sprinkle eventlet on top of your normal-looking code.
Eventlet is different from other event-based frameworks out there because it doesn't require you to restructure your code to use it. You don't have to rewrite your code to use callbacks, and you don't have to replace your main() method with some sort of dispatch method. You can just sprinkle eventlet on top of your code.
Web Crawler Example
-------------------
This is a simple web "crawler" that fetches a bunch of urls using a coroutine pool. It has as much concurrency (i.e. pages being fetched simultaneously) as coroutines in the pool (in our example, 4).
::
This is a simple web crawler that fetches a bunch of urls using a coroutine pool. It has as much concurrency (i.e. pages being fetched simultaneously) as coroutines in the pool::
urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"http://wiki.secondlife.com/w/images/secondlife.jpg",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
import time
from eventlet import coros
# this imports a special version of the urllib2 module that uses non-blocking IO
import eventlet
from eventlet.green import urllib2
def fetch(url):
print "%s fetching %s" % (time.asctime(), url)
data = urllib2.urlopen(url)
print "%s fetched %s" % (time.asctime(), data)
return urllib2.urlopen(url).read()
pool = coros.CoroutinePool(max_size=4)
waiters = []
for url in urls:
waiters.append(pool.execute(fetch, url))
# wait for all the coroutines to come back before exiting the process
for waiter in waiters:
waiter.wait()
pool = eventlet.GreenPool(200)
for body in pool.imap(fetch, urls):
print "got body", len(body)
Contents

View File

@@ -10,12 +10,10 @@ Module Reference
modules/coros
modules/db_pool
modules/greenio
modules/pool
modules/greenpool
modules/pools
modules/processes
modules/proc
modules/saranwrap
modules/timer
modules/tpool
modules/util
modules/wsgi

View File

@@ -3,4 +3,4 @@
.. automodule:: eventlet.backdoor
:members:
:undoc-members:

View File

@@ -1,5 +1,5 @@
:mod:`greenio` -- Greenlet file objects
========================================
:mod:`greenio` -- Cooperative network primitives
=================================================
.. automodule:: eventlet.greenio
:members:

View File

@@ -0,0 +1,5 @@
:mod:`greenpool` -- Green Thread Pools
========================================
.. automodule:: eventlet.greenpool
:members:

View File

@@ -1,6 +0,0 @@
:mod:`pool` -- Concurrent execution from a pool of coroutines
==============================================================
.. automodule:: eventlet.pool
:members:
:undoc-members:

View File

@@ -1,6 +1,5 @@
:mod:`pools`
==================
:mod:`pools` - Generic pools of resources
==========================================
.. automodule:: eventlet.pools
:members:
:undoc-members:

View File

@@ -1,6 +0,0 @@
:mod:`timer`
==================
.. automodule:: eventlet.timer
:members:
:undoc-members:

View File

@@ -1,6 +0,0 @@
:mod:`tpool` -- Thread pooling
================================
.. automodule:: eventlet.tpool
:members:
:undoc-members:

View File

@@ -1,6 +1,29 @@
:mod:`wsgi` -- WSGI server
===========================
The wsgi module provides a simple an easy way to start an event-driven
`WSGI <http://wsgi.org/wsgi/>`_ server. This can serve as an embedded
web server in an application, or as the basis for a more full-featured web
server package. One such package is `Spawning <http://pypi.python.org/pypi/Spawning/>`_.
To launch a wsgi server, simply create a socket and call :func:`eventlet.wsgi.server` with it::
from eventlet import wsgi
from eventlet.green import socket
def hello_world(env, start_response):
start_response('200 OK', [('Content-Type', 'text/plain')])
return ['Hello, World!\r\n']
sock = socket.socket()
sock.bind(('', 8090))
sock.listen(500)
wsgi.server(sock, hello_world)
You can find a slightly more elaborate version of this code in the file
``examples/wsgi.py``.
.. automodule:: eventlet.wsgi
:members:
:undoc-members:

View File

@@ -1,5 +1,5 @@
Using Eventlet with Threads
=============================
Threads
========
Eventlet is thread-safe and can be used in conjunction with normal Python threads. The way this works is that coroutines are confined to their 'parent' Python thread. It's like each thread contains its own little world of coroutines that can switch between themselves but not between coroutines in other threads.
@@ -9,6 +9,11 @@ You can only communicate cross-thread using the "real" thread primitives and pip
The vast majority of the times you'll want to use threads are to wrap some operation that is not "green", such as a C library that uses its own OS calls to do socket operations. The :mod:`~eventlet.tpool` module is provided to make these uses simpler.
The pyevent hub is not compatible with threads.
Tpool - Simple thread pool
---------------------------
The simplest thing to do with :mod:`~eventlet.tpool` is to :func:`~eventlet.tpool.execute` a function with it. The function will be run in a random thread in the pool, while the calling coroutine blocks on its completion::
>>> import thread
@@ -20,3 +25,6 @@ The simplest thing to do with :mod:`~eventlet.tpool` is to :func:`~eventlet.tpoo
running in new thread: True
My default there are 20 threads in the pool, but you can configure this by setting the environment variable ``EVENTLET_THREADPOOL_SIZE`` to the desired pool size before importing tpool.
.. automodule:: eventlet.tpool
:members:

View File

@@ -1,2 +1,21 @@
version_info = (0, 9, '3pre')
__version__ = '%s.%s.%s' % version_info
from eventlet import greenthread
from eventlet import greenpool
from eventlet import queue
sleep = greenthread.sleep
spawn = greenthread.spawn
spawn_n = greenthread.spawn_n
call_after_global = greenthread.call_after_global
call_after_local = greenthread.call_after_local
TimeoutError = greenthread.TimeoutError
exc_after = greenthread.exc_after
with_timeout = greenthread.with_timeout
GreenPool = greenpool.GreenPool
GreenPile = greenpool.GreenPile
Queue = queue.Queue

View File

@@ -4,9 +4,12 @@ import socket
import string
import linecache
import inspect
import warnings
from eventlet.support import greenlets as greenlet
from eventlet.hubs import get_hub as get_hub_, get_default_hub as get_default_hub_, use_hub as use_hub_
from eventlet import greenthread
from eventlet import debug
__all__ = [
'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub',
@@ -14,8 +17,6 @@ __all__ = [
'ssl_listener', 'tcp_listener', 'trampoline',
'unspew', 'use_hub', 'with_timeout', 'timeout']
import warnings
def get_hub(*a, **kw):
warnings.warn("eventlet.api.get_hub has moved to eventlet.hubs.get_hub",
DeprecationWarning, stacklevel=2)
@@ -38,10 +39,6 @@ def switch(coro, result=None, exc=None):
Greenlet = greenlet.greenlet
class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out"""
pass
def tcp_listener(address, backlog=50):
"""
@@ -81,6 +78,7 @@ def connect_tcp(address, localaddr=None):
desc.connect(address)
return desc
TimeoutError = greenthread.TimeoutError
def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError):
"""Suspend the current coroutine until the given socket object or file
@@ -119,86 +117,20 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError
t.cancel()
def _spawn_startup(cb, args, kw, cancel=None):
try:
greenlet.getcurrent().parent.switch()
cancel = None
finally:
if cancel is not None:
cancel()
return cb(*args, **kw)
spawn = greenthread.spawn
spawn_n = greenthread.spawn_n
def _spawn(g):
g.parent = greenlet.getcurrent()
g.switch()
def spawn(function, *args, **kwds):
"""Create a new coroutine, or cooperative thread of control, within which
to execute *function*.
The *function* will be called with the given *args* and keyword arguments
*kwds* and will remain in control unless it cooperatively yields by
calling a socket method or ``sleep()``.
:func:`spawn` returns control to the caller immediately, and *function*
will be called in a future main loop iteration.
An uncaught exception in *function* or any child will terminate the new
coroutine with a log message.
"""
# killable
t = None
g = Greenlet(_spawn_startup)
t = get_hub_().schedule_call_global(0, _spawn, g)
g.switch(function, args, kwds, t.cancel)
return g
def kill(g, *throw_args):
get_hub_().schedule_call_global(0, g.throw, *throw_args)
if getcurrent() is not get_hub_().greenlet:
sleep(0)
def call_after_global(seconds, function, *args, **kwds):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will be scheduled even if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
keyword arguments *kwds*, and will be executed within the main loop's
coroutine.
call_after = greenthread.call_after
call_after_local = greenthread.call_after_local
call_after_global = greenthread.call_after_global
Its return value is discarded. Any uncaught exception will be logged.
"""
# cancellable
def startup():
g = Greenlet(_spawn_startup)
g.switch(function, args, kwds)
g.switch()
t = get_hub_().schedule_call_global(seconds, startup)
return t
def call_after_local(seconds, function, *args, **kwds):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will NOT be called if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
keyword arguments *kwds*, and will be executed within the main loop's
coroutine.
Its return value is discarded. Any uncaught exception will be logged.
"""
# cancellable
def startup():
g = Greenlet(_spawn_startup)
g.switch(function, args, kwds)
g.switch()
t = get_hub_().schedule_call_local(seconds, startup)
return t
# for compatibility with original eventlet API
call_after = call_after_local
class _SilentException:
pass
@@ -252,156 +184,17 @@ class timeout(object):
if typ is _SilentException and value in self.throw_args:
return True
def with_timeout(seconds, func, *args, **kwds):
"""Wrap a call to some (yielding) function with a timeout; if the called
function fails to return before the timeout, cancel it and return a flag
value.
with_timeout = greenthread.with_timeout
:param seconds: seconds before timeout occurs
:type seconds: int or float
:param func: the callable to execute with a timeout; must be one of the
functions that implicitly or explicitly yields
:param \*args: positional arguments to pass to *func*
:param \*\*kwds: keyword arguments to pass to *func*
:param timeout_value: value to return if timeout occurs (default raise
:class:`~eventlet.api.TimeoutError`)
:rtype: Value returned by *func* if *func* returns before *seconds*, else
*timeout_value* if provided, else raise ``TimeoutError``
:exception TimeoutError: if *func* times out and no ``timeout_value`` has
been provided.
:exception *any*: Any exception raised by *func*
**Example**::
data = with_timeout(30, httpc.get, 'http://www.google.com/', timeout_value="")
Here *data* is either the result of the ``get()`` call, or the empty string if
it took too long to return. Any exception raised by the ``get()`` call is
passed through to the caller.
"""
# Recognize a specific keyword argument, while also allowing pass-through
# of any other keyword arguments accepted by func. Use pop() so we don't
# pass timeout_value through to func().
has_timeout_value = "timeout_value" in kwds
timeout_value = kwds.pop("timeout_value", None)
error = TimeoutError()
timeout = exc_after(seconds, error)
try:
try:
return func(*args, **kwds)
except TimeoutError, ex:
if ex is error and has_timeout_value:
return timeout_value
raise
finally:
timeout.cancel()
def exc_after(seconds, *throw_args):
"""Schedule an exception to be raised into the current coroutine
after *seconds* have elapsed.
This only works if the current coroutine is yielding, and is generally
used to set timeouts after which a network operation or series of
operations will be canceled.
Returns a :class:`~eventlet.timer.Timer` object with a
:meth:`~eventlet.timer.Timer.cancel` method which should be used to
prevent the exception if the operation completes successfully.
See also :func:`~eventlet.api.with_timeout` that encapsulates the idiom below.
Example::
def read_with_timeout():
timer = api.exc_after(30, RuntimeError())
try:
httpc.get('http://www.google.com/')
except RuntimeError:
print "Timed out!"
else:
timer.cancel()
"""
return call_after(seconds, getcurrent().throw, *throw_args)
def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have
elapsed.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. Calling :func:`~eventlet.api.sleep` with *seconds* of 0 is the
canonical way of expressing a cooperative yield. For example, if one is
looping over a large list performing an expensive calculation without
calling any socket methods, it's a good idea to call ``sleep(0)``
occasionally; otherwise nothing else will run.
"""
hub = get_hub_()
assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop'
timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch)
try:
hub.switch()
finally:
timer.cancel()
exc_after = greenthread.exc_after
sleep = greenthread.sleep
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
class Spew(object):
"""
"""
def __init__(self, trace_names=None, show_values=True):
self.trace_names = trace_names
self.show_values = show_values
def __call__(self, frame, event, arg):
if event == 'line':
lineno = frame.f_lineno
if '__file__' in frame.f_globals:
filename = frame.f_globals['__file__']
if (filename.endswith('.pyc') or
filename.endswith('.pyo')):
filename = filename[:-1]
name = frame.f_globals['__name__']
line = linecache.getline(filename, lineno)
else:
name = '[unknown]'
try:
src = inspect.getsourcelines(frame)
line = src[lineno]
except IOError:
line = 'Unknown code named [%s]. VM instruction #%d' % (
frame.f_code.co_name, frame.f_lasti)
if self.trace_names is None or name in self.trace_names:
print '%s:%s: %s' % (name, lineno, line.rstrip())
if not self.show_values:
return self
details = '\t'
tokens = line.translate(
string.maketrans(' ,.()', '\0' * 5)).split('\0')
for tok in tokens:
if tok in frame.f_globals:
details += '%s=%r ' % (tok, frame.f_globals[tok])
if tok in frame.f_locals:
details += '%s=%r ' % (tok, frame.f_locals[tok])
if details.strip():
print details
return self
def spew(trace_names=None, show_values=False):
"""Install a trace hook which writes incredibly detailed logs
about what code is being executed to stdout.
"""
sys.settrace(Spew(trace_names, show_values))
def unspew():
"""Remove the trace hook installed by spew.
"""
sys.settrace(None)
spew = debug.spew
unspew = debug.unspew
def named(name):

View File

@@ -3,7 +3,7 @@ import sys
import errno
from code import InteractiveConsole
from eventlet import api
from eventlet import api, hubs
from eventlet.support import greenlets
try:
@@ -16,25 +16,34 @@ except AttributeError:
sys.ps2 = '... '
class FileProxy(object):
def __init__(self, f):
self.f = f
def writeflush(*a, **kw):
f.write(*a, **kw)
f.flush()
self.fixups = {
'softspace': 0,
'isatty': lambda: True,
'flush': lambda: None,
'write': writeflush,
'readline': lambda *a: f.readline(*a).replace('\r\n', '\n'),
}
def __getattr__(self, attr):
fixups = object.__getattribute__(self, 'fixups')
if attr in fixups:
return fixups[attr]
f = object.__getattribute__(self, 'f')
return getattr(f, attr)
class SocketConsole(greenlets.greenlet):
def __init__(self, desc, hostport, locals):
self.hostport = hostport
self.locals = locals
# mangle the socket
self.desc = desc
readline = desc.readline
self.old = {}
self.fixups = {
'softspace': 0,
'isatty': lambda: True,
'flush': lambda: None,
'readline': lambda *a: readline(*a).replace('\r\n', '\n'),
}
for key, value in self.fixups.iteritems():
if hasattr(desc, key):
self.old[key] = getattr(desc, key)
setattr(desc, key, value)
self.desc = FileProxy(desc)
greenlets.greenlet.__init__(self)
def run(self):
@@ -55,49 +64,41 @@ class SocketConsole(greenlets.greenlet):
def finalize(self):
# restore the state of the socket
for key in self.fixups:
try:
value = self.old[key]
except KeyError:
delattr(self.desc, key)
else:
setattr(self.desc, key, value)
self.fixups.clear()
self.old.clear()
self.desc = None
print "backdoor closed to %s:%s" % self.hostport
def backdoor_server(server, locals=None):
""" Runs a backdoor server on the socket, accepting connections and
running backdoor consoles for each client that connects.
def backdoor_server(sock, locals=None):
""" Blocking function that runs a backdoor server on the socket *sock*,
accepting connections and running backdoor consoles for each client that
connects.
"""
print "backdoor server listening on %s:%s" % server.getsockname()
print "backdoor server listening on %s:%s" % sock.getsockname()
try:
try:
while True:
socketpair = server.accept()
socketpair = sock.accept()
backdoor(socketpair, locals)
except socket.error, e:
# Broken pipe means it was shutdown
if e[0] != errno.EPIPE:
raise
finally:
server.close()
sock.close()
def backdoor((conn, addr), locals=None):
"""Sets up an interactive console on a socket with a connected client.
This does not block the caller, as it spawns a new greenlet to handle
the console.
"""Sets up an interactive console on a socket with a single connected
client. This does not block the caller, as it spawns a new greenlet to
handle the console. This is meant to be called from within an accept loop
(such as backdoor_server).
"""
host, port = addr
print "backdoor to %s:%s" % (host, port)
fl = conn.makeGreenFile("rw")
fl.newlines = '\n'
greenlet = SocketConsole(fl, (host, port), locals)
fl = conn.makefile("rw")
console = SocketConsole(fl, (host, port), locals)
hub = hubs.get_hub()
hub.schedule_call_global(0, greenlet.switch)
hub.schedule_call_global(0, console.switch)
if __name__ == '__main__':

View File

@@ -4,7 +4,9 @@ def get_ident():
""" Returns ``id()`` of current greenlet. Useful for debugging."""
return id(api.getcurrent())
# TODO: The base threadlocal class wants to call __init__ on itself for every new thread that associates with it; our corolocal doesn't do this, but should for 100% compatibility. The implementation in _threading_local.py is so janky....
class local(object):
"""Coroutine equivalent of threading.local class."""
def __getattribute__(self, attr, g=get_ident):
try:
d = object.__getattribute__(self, '__dict__')

View File

@@ -5,11 +5,7 @@ import warnings
from eventlet import api
from eventlet import hubs
class Cancelled(RuntimeError):
pass
from eventlet import greenthread
class NOT_USED:
def __repr__(self):
@@ -17,179 +13,19 @@ class NOT_USED:
NOT_USED = NOT_USED()
class Event(object):
"""An abstraction where an arbitrary number of coroutines
can wait for one event from another.
def Event(*a, **kw):
warnings.warn("The Event class has been moved to the greenthread module! "
"Please construct greenthread.Event objects instead.",
DeprecationWarning, stacklevel=2)
return greenthread.Event(*a, **kw)
Events differ from channels in two ways:
1. calling :meth:`send` does not unschedule the current coroutine
2. :meth:`send` can only be called once; use :meth:`reset` to prepare the
event for another :meth:`send`
They are ideal for communicating return values between coroutines.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def baz(b):
... evt.send(b + 1)
...
>>> _ = api.spawn(baz, 3)
>>> evt.wait()
4
"""
_result = None
def __init__(self):
self._waiters = set()
self.reset()
def __str__(self):
params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters))
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
def reset(self):
""" Reset this event so it can be used to send again.
Can only be called after :meth:`send` has been called.
>>> from eventlet import coros
>>> evt = coros.Event()
>>> evt.send(1)
>>> evt.reset()
>>> evt.send(2)
>>> evt.wait()
2
Calling reset multiple times in a row is an error.
>>> evt.reset()
>>> evt.reset()
Traceback (most recent call last):
...
AssertionError: Trying to re-reset() a fresh event.
"""
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
self.epoch = time.time()
self._result = NOT_USED
self._exc = None
def ready(self):
""" Return true if the :meth:`wait` call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling :meth:`ready` until one returns ``True``,
and then you can :meth:`wait` on that one."""
return self._result is not NOT_USED
def has_exception(self):
return self._exc is not None
def has_result(self):
return self._result is not NOT_USED and self._exc is None
def poll(self, notready=None):
if self.ready():
return self.wait()
return notready
# QQQ make it return tuple (type, value, tb) instead of raising
# because
# 1) "poll" does not imply raising
# 2) it's better not to screw up caller's sys.exc_info() by default
# (e.g. if caller wants to calls the function in except or finally)
def poll_exception(self, notready=None):
if self.has_exception():
return self.wait()
return notready
def poll_result(self, notready=None):
if self.has_result():
return self.wait()
return notready
def wait(self):
"""Wait until another coroutine calls :meth:`send`.
Returns the value the other coroutine passed to
:meth:`send`.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def wait_on():
... retval = evt.wait()
... print "waited for", retval
>>> _ = api.spawn(wait_on)
>>> evt.send('result')
>>> api.sleep(0)
waited for result
Returns immediately if the event has already
occured.
>>> evt.wait()
'result'
"""
if self._result is NOT_USED:
self._waiters.add(api.getcurrent())
try:
return hubs.get_hub().switch()
finally:
self._waiters.discard(api.getcurrent())
if self._exc is not None:
api.getcurrent().throw(*self._exc)
return self._result
def send(self, result=None, exc=None):
"""Makes arrangements for the waiters to be woken with the
result and then returns immediately to the parent.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def waiter():
... print 'about to wait'
... result = evt.wait()
... print 'waited for', result
>>> _ = api.spawn(waiter)
>>> api.sleep(0)
about to wait
>>> evt.send('a')
>>> api.sleep(0)
waited for a
It is an error to call :meth:`send` multiple times on the same event.
>>> evt.send('whoops')
Traceback (most recent call last):
...
AssertionError: Trying to re-send() an already-triggered event.
Use :meth:`reset` between :meth:`send` s to reuse an event object.
"""
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
self._result = result
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
self._exc = exc
hub = hubs.get_hub()
if self._waiters:
hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy())
def _do_send(self, result, exc, waiters):
while waiters:
waiter = waiters.pop()
if waiter in self._waiters:
if exc is None:
waiter.switch(result)
else:
waiter.throw(*exc)
def send_exception(self, *args):
# the arguments and the same as for greenlet.throw
return self.send(None, args)
def event(*a, **kw):
warnings.warn("The event class has been capitalized! Please construct"
" Event objects instead.", DeprecationWarning, stacklevel=2)
return Event(*a, **kw)
warnings.warn("The event class has been capitalized and moved! Please "
"construct greenthread.Event objects instead.",
DeprecationWarning, stacklevel=2)
return greenthread.Event(*a, **kw)
class Semaphore(object):
"""An unbounded semaphore.
@@ -348,7 +184,7 @@ class metaphore(object):
"""
def __init__(self):
self.counter = 0
self.event = Event()
self.event = greenthread.Event()
# send() right away, else we'd wait on the default 0 count!
self.event.send()
@@ -397,14 +233,14 @@ def execute(func, *args, **kw):
>>> evt.wait()
('foo', 1)
"""
evt = Event()
def _really_execute():
evt.send(func(*args, **kw))
api.spawn(_really_execute)
return evt
warnings.warn("Coros.execute is deprecated. Please use eventlet.spawn "
"instead.", DeprecationWarning, stacklevel=2)
return greenthread.spawn(func, *args, **kw)
def CoroutinePool(*args, **kwargs):
warnings.warn("CoroutinePool is deprecated. Please use "
"eventlet.GreenPool instead.", DeprecationWarning, stacklevel=2)
from eventlet.pool import Pool
return Pool(*args, **kwargs)
@@ -470,6 +306,12 @@ class Queue(object):
def waiting(self):
return len(self._waiters)
def __iter__(self):
return self
def next(self):
return self.wait()
class Channel(object):
@@ -589,7 +431,7 @@ class Actor(object):
serially.
"""
self._mailbox = collections.deque()
self._event = Event()
self._event = greenthread.Event()
self._killer = api.spawn(self.run_forever)
self._pool = CoroutinePool(min_size=0, max_size=concurrency)
@@ -598,7 +440,7 @@ class Actor(object):
while True:
if not self._mailbox:
self._event.wait()
self._event = Event()
self._event = greenthread.Event()
else:
# leave the message in the mailbox until after it's
# been processed so the event doesn't get triggered
@@ -639,7 +481,7 @@ class Actor(object):
coroutine in a predictable manner, but this kinda defeats the point of
the :class:`Actor`, so don't do it in a real application.
>>> evt = Event()
>>> evt = greenthread.Event()
>>> a.cast( ("message 1", evt) )
>>> evt.wait() # force it to run at this exact moment
received message 1

88
eventlet/debug.py Normal file
View File

@@ -0,0 +1,88 @@
"""The debug module contains utilities and functions for better
debugging Eventlet-powered applications."""
__all__ = ['spew', 'unspew', 'hub_listener_stacks',
'hub_exceptions', 'tpool_exceptions']
class Spew(object):
"""
"""
def __init__(self, trace_names=None, show_values=True):
self.trace_names = trace_names
self.show_values = show_values
def __call__(self, frame, event, arg):
if event == 'line':
lineno = frame.f_lineno
if '__file__' in frame.f_globals:
filename = frame.f_globals['__file__']
if (filename.endswith('.pyc') or
filename.endswith('.pyo')):
filename = filename[:-1]
name = frame.f_globals['__name__']
line = linecache.getline(filename, lineno)
else:
name = '[unknown]'
try:
src = inspect.getsourcelines(frame)
line = src[lineno]
except IOError:
line = 'Unknown code named [%s]. VM instruction #%d' % (
frame.f_code.co_name, frame.f_lasti)
if self.trace_names is None or name in self.trace_names:
print '%s:%s: %s' % (name, lineno, line.rstrip())
if not self.show_values:
return self
details = '\t'
tokens = line.translate(
string.maketrans(' ,.()', '\0' * 5)).split('\0')
for tok in tokens:
if tok in frame.f_globals:
details += '%s=%r ' % (tok, frame.f_globals[tok])
if tok in frame.f_locals:
details += '%s=%r ' % (tok, frame.f_locals[tok])
if details.strip():
print details
return self
def spew(trace_names=None, show_values=False):
"""Install a trace hook which writes incredibly detailed logs
about what code is being executed to stdout.
"""
sys.settrace(Spew(trace_names, show_values))
def unspew():
"""Remove the trace hook installed by spew.
"""
sys.settrace(None)
def hub_listener_stacks(state):
"""Toggles whether or not the hub records the stack when clients register
listeners on file descriptors. This can be useful when trying to figure
out what the hub is up to at any given moment. To inspect the stacks
of the current listeners, you have to iterate over them::
from eventlet import hubs
for reader in hubs.get_hub().get_readers():
print reader
"""
from eventlet import hubs
hubs.get_hub().set_debug_listeners(state)
def hub_exceptions(state):
"""Toggles whether the hub prints exceptions that are raised from its
timers. This can be useful to see how greenthreads are terminating.
"""
from eventlet import hubs
hubs.get_hub().set_timer_exceptions(state)
def tpool_exceptions(state):
"""Toggles whether tpool itself prints exceptions that are raised from
functions that are executed in it, in addition to raising them like
it normally does."""
from eventlet import tpool
tpool.QUIET = not state

View File

@@ -15,57 +15,16 @@ import warnings
from errno import EWOULDBLOCK, EAGAIN
__all__ = ['GreenSocket', 'GreenPipe']
__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
def higher_order_recv(recv_func):
def recv(self, buflen, flags=0):
if self.act_non_blocking:
return self.fd.recv(buflen, flags)
buf = self.recvbuffer
if buf:
chunk, self.recvbuffer = buf[:buflen], buf[buflen:]
return chunk
fd = self.fd
bytes = recv_func(fd, buflen, flags)
if self.gettimeout():
end = time.time()+self.gettimeout()
else:
end = None
timeout = None
while bytes is None:
try:
if end:
timeout = end - time.time()
trampoline(fd, read=True, timeout=timeout, timeout_exc=socket.timeout)
except socket.timeout:
raise
except socket.error, e:
if e[0] == errno.EPIPE:
bytes = ''
else:
raise
else:
bytes = recv_func(fd, buflen, flags)
self.recvcount += len(bytes)
return bytes
return recv
CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
CONNECT_SUCCESS = set((0, errno.EISCONN))
def higher_order_send(send_func):
def send(self, data, flags=0):
if self.act_non_blocking:
return self.fd.send(data, flags)
count = send_func(self.fd, data, flags)
if not count:
return 0
self.sendcount += count
return count
return send
CONNECT_ERR = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)
CONNECT_SUCCESS = (0, errno.EISCONN)
def socket_connect(descriptor, address):
"""
Attempts to connect to the address, returns the descriptor if it succeeds,
returns None if it needs to trampoline, and raises any exceptions.
"""
err = descriptor.connect_ex(address)
if err in CONNECT_ERR:
return None
@@ -75,6 +34,11 @@ def socket_connect(descriptor, address):
def socket_accept(descriptor):
"""
Attempts to accept() on the descriptor, returns a client,address tuple
if it succeeds; returns None if it needs to trampoline, and raises
any exceptions.
"""
try:
return descriptor.accept()
except socket.error, e:
@@ -83,63 +47,23 @@ def socket_accept(descriptor):
raise
def socket_send(descriptor, data, flags=0):
try:
return descriptor.send(data, flags)
except socket.error, e:
if e[0] == errno.EWOULDBLOCK or e[0] == errno.ENOTCONN:
return 0
raise
if sys.platform[:3]=="win":
# winsock sometimes throws ENOTCONN
SOCKET_BLOCKING = (errno.EWOULDBLOCK,)
SOCKET_CLOSED = (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN)
SOCKET_BLOCKING = set((errno.EWOULDBLOCK,))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
else:
# oddly, on linux/darwin, an unconnected socket is expected to block,
# so we treat ENOTCONN the same as EWOULDBLOCK
SOCKET_BLOCKING = (errno.EWOULDBLOCK, errno.ENOTCONN)
SOCKET_CLOSED = (errno.ECONNRESET, errno.ESHUTDOWN)
def socket_recv(descriptor, buflen, flags=0):
try:
return descriptor.recv(buflen, flags)
except socket.error, e:
if e[0] in SOCKET_BLOCKING:
return None
if e[0] in SOCKET_CLOSED:
return ''
raise
def file_recv(fd, buflen, flags=0):
try:
return fd.read(buflen)
except IOError, e:
if e[0] == EAGAIN:
return None
return ''
except socket.error, e:
if e[0] == errno.EPIPE:
return ''
raise
def file_send(fd, data, flags=0):
try:
fd.write(data)
fd.flush()
return len(data)
except IOError, e:
if e[0] == EAGAIN:
return 0
except ValueError, e:
written = 0
except socket.error, e:
if e[0] == errno.EPIPE:
written = 0
SOCKET_BLOCKING = set((errno.EWOULDBLOCK, errno.ENOTCONN))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
def set_nonblocking(fd):
"""
Sets the descriptor to be nonblocking. Works on many file-like
objects as well as sockets. Only sockets can be nonblocking on
Windows, however.
"""
try:
setblocking = fd.setblocking
except AttributeError:
@@ -174,6 +98,10 @@ except ImportError:
class GreenSocket(object):
"""
Green version of socket.socket class, that is intended to be 100%
API-compatible.
"""
timeout = None
def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
if isinstance(family_or_realsock, (int, long)):
@@ -190,9 +118,6 @@ class GreenSocket(object):
set_nonblocking(fd)
self.fd = fd
self._fileno = fd.fileno()
self.sendcount = 0
self.recvcount = 0
self.recvbuffer = ''
self.closed = False
self.timeout = socket.getdefaulttimeout()
@@ -317,9 +242,28 @@ class GreenSocket(object):
return socket._fileobject(self.dup(), mode, bufsize)
def makeGreenFile(self, mode='r', bufsize=-1):
return Green_fileobject(self.dup())
warnings.warn("makeGreenFile has been deprecated, please use "
"makefile instead", DeprecationWarning, stacklevel=2)
return self.makefile(mode, bufsize)
recv = higher_order_recv(socket_recv)
def recv(self, buflen, flags=0):
fd = self.fd
if self.act_non_blocking:
return fd.recv(buflen, flags)
while True:
try:
return fd.recv(buflen, flags)
except socket.error, e:
if e[0] in SOCKET_BLOCKING:
pass
elif e[0] in SOCKET_CLOSED:
return ''
else:
raise
trampoline(fd,
read=True,
timeout=self.gettimeout(),
timeout_exc=socket.timeout)
def recvfrom(self, *args):
if not self.act_non_blocking:
@@ -336,13 +280,25 @@ class GreenSocket(object):
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
return self.fd.recv_into(*args)
send = higher_order_send(socket_send)
def send(self, data, flags=0):
fd = self.fd
if self.act_non_blocking:
return fd.send(data, flags)
try:
return fd.send(data, flags)
except socket.error, e:
if e[0] in SOCKET_BLOCKING:
return 0
raise
def sendall(self, data, flags=0):
fd = self.fd
tail = self.send(data, flags)
while tail < len(data):
trampoline(self.fd, write=True, timeout_exc=socket.timeout)
trampoline(fd,
write=True,
timeout=self.gettimeout(),
timeout_exc=socket.timeout)
tail += self.send(data[tail:], flags)
def sendto(self, *args):
@@ -385,47 +341,80 @@ class GreenSocket(object):
return self.timeout
class Green_fileobject(object):
"""Green version of socket._fileobject, for use only with regular
sockets."""
class GreenPipe(object):
""" GreenPipe is a cooperatively-yielding wrapper around OS pipes.
"""
newlines = '\r\n'
mode = 'wb+'
def __init__(self, fd):
if isinstance(fd, GreenSocket):
set_nonblocking(fd.fd)
else:
set_nonblocking(fd)
self.sock = fd
self.fd = fd
self.closed = False
self.recvbuffer = ''
def close(self):
self.sock.close()
self.fd.close()
self.closed = True
def fileno(self):
return self.sock.fileno()
return self.fd.fileno()
# TODO next
def read(self, buflen, flags=0):
fd = self.fd
if buflen is None:
buflen = BUFFER_SIZE
buf = self.recvbuffer
if buf:
chunk, self.recvbuffer = buf[:buflen], buf[buflen:]
return chunk
while True:
try:
return fd.read(buflen)
except IOError, e:
if e[0] != EAGAIN:
return ''
except socket.error, e:
if e[0] == errno.EPIPE:
return ''
raise
trampoline(fd, read=True)
def write(self, data, flags=0):
fd = self.fd
tail = 0
len_data = len(data)
while tail < len_data:
tosend = data[tail:]
try:
fd.write(tosend)
fd.flush()
tail += len(tosend)
if tail == len_data:
return len_data
except IOError, e:
if e[0] != EAGAIN:
raise
except ValueError, e:
pass
except socket.error, e:
if e[0] != errno.EPIPE:
raise
trampoline(fd, write=True)
def flush(self):
pass
def write(self, data):
return self.sock.sendall(data)
def readuntil(self, terminator, size=None):
buf, self.sock.recvbuffer = self.sock.recvbuffer, ''
buf, self.recvbuffer = self.recvbuffer, ''
checked = 0
if size is None:
while True:
found = buf.find(terminator, checked)
if found != -1:
found += len(terminator)
chunk, self.sock.recvbuffer = buf[:found], buf[found:]
chunk, self.recvbuffer = buf[:found], buf[found:]
return chunk
checked = max(0, len(buf) - (len(terminator) - 1))
d = self.sock.recv(BUFFER_SIZE)
d = self.fd.read(BUFFER_SIZE)
if not d:
break
buf += d
@@ -434,14 +423,14 @@ class Green_fileobject(object):
found = buf.find(terminator, checked)
if found != -1:
found += len(terminator)
chunk, self.sock.recvbuffer = buf[:found], buf[found:]
chunk, self.recvbuffer = buf[:found], buf[found:]
return chunk
checked = len(buf)
d = self.sock.recv(BUFFER_SIZE)
d = self.fd.read(BUFFER_SIZE)
if not d:
break
buf += d
chunk, self.sock.recvbuffer = buf[:size], buf[size:]
chunk, self.recvbuffer = buf[:size], buf[size:]
return chunk
def readline(self, size=None):
@@ -450,9 +439,6 @@ class Green_fileobject(object):
def __iter__(self):
return self.xreadlines()
def readlines(self, size=None):
return list(self.xreadlines(size=size))
def xreadlines(self, size=None):
if size is None:
while True:
@@ -472,61 +458,6 @@ class Green_fileobject(object):
for line in lines:
self.write(line)
def read(self, size=None):
if size is not None and not isinstance(size, (int, long)):
raise TypeError('Expecting an int or long for size, got %s: %s' % (type(size), repr(size)))
buf, self.sock.recvbuffer = self.sock.recvbuffer, ''
lst = [buf]
if size is None:
while True:
d = self.sock.recv(BUFFER_SIZE)
if not d:
break
lst.append(d)
else:
buflen = len(buf)
while buflen < size:
d = self.sock.recv(BUFFER_SIZE)
if not d:
break
buflen += len(d)
lst.append(d)
else:
d = lst[-1]
overbite = buflen - size
if overbite:
lst[-1], self.sock.recvbuffer = d[:-overbite], d[-overbite:]
else:
lst[-1], self.sock.recvbuffer = d, ''
return ''.join(lst)
class GreenPipeSocket(GreenSocket):
""" This is a weird class that looks like a socket but expects a file descriptor as an argument instead of a socket.
"""
recv = higher_order_recv(file_recv)
send = higher_order_send(file_send)
class GreenPipe(Green_fileobject):
def __init__(self, fd):
set_nonblocking(fd)
self.fd = GreenPipeSocket(fd)
super(GreenPipe, self).__init__(self.fd)
def recv(self, *args, **kw):
fn = self.recv = self.fd.recv
return fn(*args, **kw)
def send(self, *args, **kw):
fn = self.send = self.fd.send
return fn(*args, **kw)
def flush(self):
self.fd.fd.flush()
# import SSL module here so we can refer to greenio.SSL.exceptionclass
try:

200
eventlet/greenpool.py Normal file
View File

@@ -0,0 +1,200 @@
import itertools
from eventlet import greenthread
from eventlet import coros
__all__ = ['GreenPool', 'GreenPile']
try:
next
except NameError:
def next(it):
try:
return it.next()
except AttributeError:
raise TypeError("%s object is not an iterator" % type(it))
class GreenPool(object):
""" The GreenPool class is a pool of green threads.
"""
def __init__(self, size):
self.size = size
self.coroutines_running = set()
self.sem = coros.Semaphore(size)
self.no_coros_running = greenthread.Event()
def resize(self, new_size):
""" Change the max number of coroutines doing work at any given time.
If resize is called when there are more than *new_size*
coroutines already working on tasks, they will be allowed to complete
but no new tasks will be allowed to get launched until enough coroutines
finish their tasks to drop the overall quantity below *new_size*. Until
then, the return value of free() will be negative.
"""
size_delta = new_size - self.size
self.sem.counter += size_delta
self.size = new_size
def running(self):
""" Returns the number of coroutines that are currently executing
functions in the Parallel's pool."""
return len(self.coroutines_running)
def free(self):
""" Returns the number of coroutines available for use.
If zero or less, the next call to :meth:`spawn` will block the calling
coroutine until a slot becomes available."""
return self.sem.counter
def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
Returns the GreenThread object that is running the function, which can
be used to retrieve the results.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
# a bit hacky to use the GT without switching to it
gt = greenthread.GreenThread(current)
gt.main(function, args, kwargs)
return gt
else:
self.sem.acquire()
gt = greenthread.spawn(function, *args, **kwargs)
if not self.coroutines_running:
self.no_coros_running = greenthread.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done, coro=gt)
return gt
def _spawn_n_impl(self, func, args, kwargs, coro=None):
try:
try:
func(*args, **kwargs)
except (KeyboardInterrupt, SystemExit):
raise
except:
# TODO in debug mode print these
pass
finally:
if coro is None:
return
else:
coro = greenthread.getcurrent()
self._spawn_done(coro=coro)
def spawn_n(self, func, *args, **kwargs):
""" Create a coroutine to run the *function*. Returns None; the results
of the function are not retrievable.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
self._spawn_n_impl(func, args, kwargs)
else:
self.sem.acquire()
g = greenthread.spawn_n(self._spawn_n_impl, func, args, kwargs, coro=True)
if not self.coroutines_running:
self.no_coros_running = greenthread.Event()
self.coroutines_running.add(g)
def waitall(self):
"""Waits until all coroutines in the pool are finished working."""
self.no_coros_running.wait()
def _spawn_done(self, result=None, exc=None, coro=None):
self.sem.release()
if coro is not None:
self.coroutines_running.remove(coro)
# if done processing (no more work is waiting for processing),
# send StopIteration so that the queue knows it's done
if self.sem.balance == self.size:
self.no_coros_running.send(None)
def waiting(self):
"""Return the number of coroutines waiting to spawn.
"""
if self.sem.balance < 0:
return -self.sem.balance
else:
return 0
def _do_imap(self, func, it, gi):
for args in it:
gi.spawn(func, *args)
gi.spawn(raise_stop_iteration)
def imap(self, function, *iterables):
"""This is the same as itertools.imap, except that *func* is
executed in separate green threads, with the concurrency controlled by
the pool. In operation, imap consumes a constant amount of memory,
proportional to the size of the pool, and is thus suited for iterating
over extremely long input lists.
"""
if function is None:
function = lambda *a: a
it = itertools.izip(*iterables)
gi = GreenImap(self.size)
greenthread.spawn_n(self._do_imap, function, it, gi)
return gi
def raise_stop_iteration():
raise StopIteration()
class GreenPile(object):
"""GreenPile is an abstraction representing a bunch of I/O-related tasks.
Construct a GreenPile with an existing GreenPool object. The GreenPile will
then use that pool's concurrency as it processes its jobs. There can be
many GreenPiles associated with a single GreenPool.
A GreenPile can also be constructed standalone, not associated with any
GreenPool. To do this, construct it with an integer size parameter instead
of a GreenPool
"""
def __init__(self, size_or_pool):
if isinstance(size_or_pool, GreenPool):
self.pool = size_or_pool
else:
self.pool = GreenPool(size_or_pool)
self.waiters = coros.Queue()
self.used = False
self.counter = 0
def spawn(self, func, *args, **kw):
"""Runs *func* in its own green thread, with the result available by
iterating over the GreenPile object."""
self.used = True
self.counter += 1
try:
gt = self.pool.spawn(func, *args, **kw)
self.waiters.send(gt)
except:
self.counter -= 1
raise
def __iter__(self):
return self
def next(self):
"""Wait for the next result, suspending the current coroutine until it
is available. Raises StopIteration when there are no more results."""
if self.counter == 0 and self.used:
raise StopIteration()
try:
return self.waiters.wait().wait()
finally:
self.counter -= 1
# this is identical to GreenPile but it blocks on spawn if the results
# aren't consumed
class GreenImap(GreenPile):
def __init__(self, size_or_pool):
super(GreenImap, self).__init__(size_or_pool)
self.waiters = coros.Channel(max_size=self.pool.size)

384
eventlet/greenthread.py Normal file
View File

@@ -0,0 +1,384 @@
import sys
from eventlet import hubs
from eventlet import timer
from eventlet.support import greenlets as greenlet
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'call_after_global', 'call_after_local', 'GreenThread', 'Event']
getcurrent = greenlet.getcurrent
def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have
elapsed.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. Calling :func:`~eventlet.api.sleep` with *seconds* of 0 is the
canonical way of expressing a cooperative yield. For example, if one is
looping over a large list performing an expensive calculation without
calling any socket methods, it's a good idea to call ``sleep(0)``
occasionally; otherwise nothing else will run.
"""
hub = hubs.get_hub()
assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop'
timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch)
try:
hub.switch()
finally:
timer.cancel()
def spawn(func, *args, **kwargs):
"""Create a green thread to run func(*args, **kwargs). Returns a
GreenThread object which you can use to get the results of the call.
"""
hub = hubs.get_hub()
g = GreenThread(hub.greenlet)
hub.schedule_call_global(0, g.switch, func, args, kwargs)
return g
def _main_wrapper(func, args, kwargs):
# function that gets around the fact that greenlet.switch
# doesn't accept keyword arguments
return func(*args, **kwargs)
def spawn_n(func, *args, **kwargs):
"""Same as spawn, but returns a greenlet object from which it is not
possible to retrieve the results. This is slightly faster than spawn; it is
fastest if there are no keyword arguments."""
return _spawn_n(0, func, args, kwargs)[1]
def call_after_global(seconds, func, *args, **kwargs):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will be scheduled even if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
keyword arguments *kwargs*, and will be executed within the main loop's
coroutine.
Its return value is discarded. Any uncaught exception will be logged."""
return _spawn_n(seconds, func, args, kwargs)[0]
def call_after_local(seconds, function, *args, **kwargs):
"""Schedule *function* to be called after *seconds* have elapsed.
The function will NOT be called if the current greenlet has exited.
*seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *function* will be called with the given *args* and
keyword arguments *kwargs*, and will be executed within the main loop's
coroutine.
Its return value is discarded. Any uncaught exception will be logged.
"""
hub = hubs.get_hub()
g = greenlet.greenlet(_main_wrapper, parent=hub.greenlet)
t = hub.schedule_call_local(seconds, g.switch, function, args, kwargs)
return t
call_after = call_after_local
class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out"""
pass
def exc_after(seconds, *throw_args):
"""Schedule an exception to be raised into the current coroutine
after *seconds* have elapsed.
This only works if the current coroutine is yielding, and is generally
used to set timeouts after which a network operation or series of
operations will be canceled.
Returns a :class:`~eventlet.timer.Timer` object with a
:meth:`~eventlet.timer.Timer.cancel` method which should be used to
prevent the exception if the operation completes successfully.
See also :func:`~eventlet.api.with_timeout` that encapsulates the idiom below.
Example::
def read_with_timeout():
timer = api.exc_after(30, RuntimeError())
try:
httpc.get('http://www.google.com/')
except RuntimeError:
print "Timed out!"
else:
timer.cancel()
"""
if seconds is None: # dummy argument, do nothing
return timer.Timer(seconds, lambda: None)
hub = hubs.get_hub()
return hub.schedule_call_local(seconds, getcurrent().throw, *throw_args)
def with_timeout(seconds, func, *args, **kwds):
"""Wrap a call to some (yielding) function with a timeout; if the called
function fails to return before the timeout, cancel it and return a flag
value.
:param seconds: seconds before timeout occurs
:type seconds: int or float
:param func: the callable to execute with a timeout; must be one of the
functions that implicitly or explicitly yields
:param \*args: positional arguments to pass to *func*
:param \*\*kwds: keyword arguments to pass to *func*
:param timeout_value: value to return if timeout occurs (default raise
:class:`~eventlet.api.TimeoutError`)
:rtype: Value returned by *func* if *func* returns before *seconds*, else
*timeout_value* if provided, else raise ``TimeoutError``
:exception TimeoutError: if *func* times out and no ``timeout_value`` has
been provided.
:exception *any*: Any exception raised by *func*
**Example**::
data = with_timeout(30, httpc.get, 'http://www.google.com/', timeout_value="")
Here *data* is either the result of the ``get()`` call, or the empty string if
it took too long to return. Any exception raised by the ``get()`` call is
passed through to the caller.
"""
# Recognize a specific keyword argument, while also allowing pass-through
# of any other keyword arguments accepted by func. Use pop() so we don't
# pass timeout_value through to func().
has_timeout_value = "timeout_value" in kwds
timeout_value = kwds.pop("timeout_value", None)
error = TimeoutError()
timeout = exc_after(seconds, error)
try:
try:
return func(*args, **kwds)
except TimeoutError, ex:
if ex is error and has_timeout_value:
return timeout_value
raise
finally:
timeout.cancel()
def _spawn_n(seconds, func, args, kwargs):
hub = hubs.get_hub()
if kwargs:
g = greenlet.greenlet(_main_wrapper, parent=hub.greenlet)
t = hub.schedule_call_global(seconds, g.switch, func, args, kwargs)
else:
g = greenlet.greenlet(func, parent=hub.greenlet)
t = hub.schedule_call_global(seconds, g.switch, *args)
return t, g
class GreenThread(greenlet.greenlet):
def __init__(self, parent):
greenlet.greenlet.__init__(self, self.main, parent)
self._exit_event = Event()
def wait(self):
return self._exit_event.wait()
def link(self, func, *curried_args, **curried_kwargs):
""" Set up a function to be called with the results of the GreenThread.
The function must have the following signature:
def f(result=None, exc=None, [curried args/kwargs]):
"""
self._exit_funcs = getattr(self, '_exit_funcs', [])
self._exit_funcs.append((func, curried_args, curried_kwargs))
def main(self, function, args, kwargs):
try:
result = function(*args, **kwargs)
except:
self._exit_event.send_exception(*sys.exc_info())
# ca and ckw are the curried function arguments
for f, ca, ckw in getattr(self, '_exit_funcs', []):
f(exc=sys.exc_info(), *ca, **ckw)
raise
else:
self._exit_event.send(result)
for f, ca, ckw in getattr(self, '_exit_funcs', []):
f(result, *ca, **ckw)
class NOT_USED:
def __repr__(self):
return 'NOT_USED'
NOT_USED = NOT_USED()
class Event(object):
"""An abstraction where an arbitrary number of coroutines
can wait for one event from another.
Events differ from channels in two ways:
1. calling :meth:`send` does not unschedule the current coroutine
2. :meth:`send` can only be called once; use :meth:`reset` to prepare the
event for another :meth:`send`
They are ideal for communicating return values between coroutines.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def baz(b):
... evt.send(b + 1)
...
>>> _ = api.spawn(baz, 3)
>>> evt.wait()
4
"""
_result = None
def __init__(self):
self._waiters = set()
self.reset()
def __str__(self):
params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters))
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
def reset(self):
""" Reset this event so it can be used to send again.
Can only be called after :meth:`send` has been called.
>>> from eventlet import coros
>>> evt = coros.Event()
>>> evt.send(1)
>>> evt.reset()
>>> evt.send(2)
>>> evt.wait()
2
Calling reset multiple times in a row is an error.
>>> evt.reset()
>>> evt.reset()
Traceback (most recent call last):
...
AssertionError: Trying to re-reset() a fresh event.
"""
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
self._result = NOT_USED
self._exc = None
def ready(self):
""" Return true if the :meth:`wait` call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling :meth:`ready` until one returns ``True``,
and then you can :meth:`wait` on that one."""
return self._result is not NOT_USED
def has_exception(self):
return self._exc is not None
def has_result(self):
return self._result is not NOT_USED and self._exc is None
def poll(self, notready=None):
if self.ready():
return self.wait()
return notready
# QQQ make it return tuple (type, value, tb) instead of raising
# because
# 1) "poll" does not imply raising
# 2) it's better not to screw up caller's sys.exc_info() by default
# (e.g. if caller wants to calls the function in except or finally)
def poll_exception(self, notready=None):
if self.has_exception():
return self.wait()
return notready
def poll_result(self, notready=None):
if self.has_result():
return self.wait()
return notready
def wait(self):
"""Wait until another coroutine calls :meth:`send`.
Returns the value the other coroutine passed to
:meth:`send`.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def wait_on():
... retval = evt.wait()
... print "waited for", retval
>>> _ = api.spawn(wait_on)
>>> evt.send('result')
>>> api.sleep(0)
waited for result
Returns immediately if the event has already
occured.
>>> evt.wait()
'result'
"""
current = getcurrent()
if self._result is NOT_USED:
self._waiters.add(current)
try:
return hubs.get_hub().switch()
finally:
self._waiters.discard(current)
if self._exc is not None:
current.throw(*self._exc)
return self._result
def send(self, result=None, exc=None):
"""Makes arrangements for the waiters to be woken with the
result and then returns immediately to the parent.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def waiter():
... print 'about to wait'
... result = evt.wait()
... print 'waited for', result
>>> _ = api.spawn(waiter)
>>> api.sleep(0)
about to wait
>>> evt.send('a')
>>> api.sleep(0)
waited for a
It is an error to call :meth:`send` multiple times on the same event.
>>> evt.send('whoops')
Traceback (most recent call last):
...
AssertionError: Trying to re-send() an already-triggered event.
Use :meth:`reset` between :meth:`send` s to reuse an event object.
"""
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
self._result = result
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
self._exc = exc
hub = hubs.get_hub()
if self._waiters:
hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy())
def _do_send(self, result, exc, waiters):
while waiters:
waiter = waiters.pop()
if waiter in self._waiters:
if exc is None:
waiter.switch(result)
else:
waiter.throw(*exc)
def send_exception(self, *args):
# the arguments and the same as for greenlet.throw
return self.send(None, args)

View File

@@ -25,7 +25,6 @@ class FdListener(object):
# in debug mode, track the call site that created the listener
class DebugListener(FdListener):
def __init__(self, evtype, fileno, cb):
import traceback
self.where_called = traceback.format_stack()
super(DebugListener, self).__init__(evtype, fileno, cb)
def __repr__(self):
@@ -61,7 +60,6 @@ class BaseHub(object):
'exit': [],
}
self.lclass = FdListener
self.silent_timer_exceptions = False
def add(self, evtype, fileno, cb):
""" Signals an intent to or write a particular file descriptor.
@@ -220,11 +218,15 @@ class BaseHub(object):
except:
self.squelch_observer_exception(observer, sys.exc_info())
def squelch_timer_exception(self, timer, exc_info):
if not self.silent_timer_exceptions:
def _silent_squelch_timer_exception(self, timer, exc_info):
pass
def _debug_squelch_timer_exception(self, timer, exc_info):
traceback.print_exception(*exc_info)
print >>sys.stderr, "Timer raised: %r" % (timer,)
squelch_timer_exception = _silent_squelch_timer_exception
def _add_absolute_timer(self, when, info):
# the 0 placeholder makes it easy to bisect_right using (now, 1)
self.next_timers.append((when, 0, info))
@@ -259,8 +261,6 @@ class BaseHub(object):
self.add_timer(t)
return t
schedule_call = schedule_call_local
def schedule_call_global(self, seconds, cb, *args, **kw):
"""Schedule a callable to be called after 'seconds' seconds have
elapsed. The timer will NOT be cancelled if the current greenlet has
@@ -302,13 +302,14 @@ class BaseHub(object):
def get_timers_count(hub):
return max(len(x) for x in [hub.timers, hub.next_timers])
def _setdebug(self, value):
def set_debug_listeners(self, value):
if value:
self.lclass = DebugListener
else:
self.lclass = FdListener
def _getdebug(self):
return self.lclass == DebugListener
debug = property(_getdebug, _setdebug)
def set_timer_exceptions(self, value):
if value:
self.squelch_timer_exception = self._debug_squelch_timer_exception
else:
self.squelch_timer_exception = self._silent_squelch_timer_exception

View File

@@ -96,8 +96,7 @@ class Hub(BaseHub):
self.schedule_call_global(0, api.getcurrent().parent.throw, *self.signal_exc_info)
self.signal_exc_info = None
else:
if not self.silent_timer_exceptions:
traceback.print_exc()
self.squelch_timer_exception(None, sys.exc_info())
def abort(self):
self.schedule_call_global(0, self.greenlet.throw, api.GreenletExit)

View File

@@ -1,8 +1,11 @@
# replacement of CoroutinePool implemented with proc module
from eventlet import coros, proc, api
class Pool(object):
import warnings
warnings.warn("The pool module is deprecated. Please use the "
"eventlet.GreenPool and eventlet.GreenPile classes instead.",
DeprecationWarning, stacklevel=2)
class Pool(object):
def __init__(self, min_size=0, max_size=4, track_events=False):
if min_size > max_size:
raise ValueError('min_size cannot be bigger than max_size')

View File

@@ -3,16 +3,7 @@ import collections
from eventlet import api
from eventlet import coros
class FanFailed(RuntimeError):
pass
class SomeFailed(FanFailed):
pass
class AllFailed(FanFailed):
pass
__all__ = ['Pool', 'TokenPool']
# have to stick this in an exec so it works in 2.4
try:
@@ -44,6 +35,9 @@ except ImportError:
class Pool(object):
"""
Pool is a base class that is meant to be subclassed. When subclassing,
define the :meth:`create` method to implement the desired resource.
When using the pool, if you do a get, you should **always** do a
:meth:`put`.
@@ -141,8 +135,8 @@ class Token(object):
class TokenPool(Pool):
"""A pool which gives out tokens, an object indicating that
the person who holds the token has a right to consume some
"""A pool which gives out tokens (opaque unique objects), which indicate
that the coroutine which holds the token has a right to consume some
limited resource.
"""
def create(self):

388
eventlet/queue.py Normal file
View File

@@ -0,0 +1,388 @@
# Copyright (c) 2009 Denis Bilenko. See LICENSE for details.
"""Synchronized queues.
The :mod:`eventlet.queue` module implements multi-producer, multi-consumer queues that work across greenlets, with the API similar to the classes found in the standard :mod:`Queue` and :class:`multiprocessing <multiprocessing.Queue>` modules.
A major difference is that queues in this module operate as channels when
initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty`
and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until a call to :meth:`Queue.get` retrieves the item.
Another interesting difference is that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be used as indicators of whether the subsequent :meth:`Queue.get` or :meth:`Queue.put` will not block.
"""
import sys
import heapq
import collections
from Queue import Full, Empty
_NONE = object()
from eventlet.hubs import get_hub
from eventlet.greenthread import getcurrent, exc_after, Event
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue']
class Waiter(object):
"""A low level synchronization class.
Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them safe:
* switching will occur only if the waiting greenlet is executing :meth:`wait`
method currently. Otherwise, :meth:`switch` and :meth:`throw` are no-ops.
* any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
The :meth:`wait` method must be called from a greenlet other than :class:`Hub`.
"""
__slots__ = ['greenlet']
def __init__(self):
self.greenlet = None
def __repr__(self):
if self.waiting:
waiting = ' waiting'
else:
waiting = ''
return '<%s at %s%s greenlet=%r>' % (type(self).__name__, hex(id(self)), waiting, self.greenlet)
def __str__(self):
"""
>>> print Waiter()
<Waiter greenlet=None>
"""
if self.waiting:
waiting = ' waiting'
else:
waiting = ''
return '<%s%s greenlet=%s>' % (type(self).__name__, waiting, self.greenlet)
def __nonzero__(self):
return self.greenlet is not None
@property
def waiting(self):
return self.greenlet is not None
def switch(self, value=None):
"""Wake up the greenlet that is calling wait() currently (if there is one).
Can only be called from Hub's greenlet.
"""
assert getcurrent() is get_hub().greenlet, "Can only use Waiter.switch method from the mainloop"
if self.greenlet is not None:
try:
self.greenlet.switch(value)
except:
traceback.print_exc()
def throw(self, *throw_args):
"""Make greenlet calling wait() wake up (if there is a wait()).
Can only be called from Hub's greenlet.
"""
assert getcurrent() is get_hub().greenlet, "Can only use Waiter.switch method from the mainloop"
if self.greenlet is not None:
try:
self.greenlet.throw(*throw_args)
except:
traceback.print_exc()
# XXX should be renamed to get() ? and the whole class is called Receiver?
def wait(self):
"""Wait until switch() or throw() is called.
"""
assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
self.greenlet = getcurrent()
try:
return get_hub().switch()
finally:
self.greenlet = None
class Queue(object):
"""Create a queue object with a given maximum size.
If *maxsize* is less than zero or ``None``, the queue size is infinite.
``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks until the
item is delivered. (This is unlike the standard :class:`Queue`, where 0 means
infinite size).
"""
def __init__(self, maxsize=None):
if maxsize < 0:
self.maxsize = None
else:
self.maxsize = maxsize
self.getters = set()
self.putters = set()
self._event_unlock = None
self._init(maxsize)
# QQQ make maxsize into a property with setter that schedules unlock if necessary
def _init(self, maxsize):
self.queue = collections.deque()
def _get(self):
return self.queue.popleft()
def _put(self, item):
self.queue.append(item)
def __repr__(self):
return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
def __str__(self):
return '<%s %s>' % (type(self).__name__, self._format())
def _format(self):
result = 'maxsize=%r' % (self.maxsize, )
if getattr(self, 'queue', None):
result += ' queue=%r' % self.queue
if self.getters:
result += ' getters[%s]' % len(self.getters)
if self.putters:
result += ' putters[%s]' % len(self.putters)
if self._event_unlock is not None:
result += ' unlocking'
return result
def qsize(self):
"""Return the size of the queue."""
return len(self.queue)
def empty(self):
"""Return ``True`` if the queue is empty, ``False`` otherwise."""
return not self.qsize()
def full(self):
"""Return ``True`` if the queue is full, ``False`` otherwise.
``Queue(None)`` is never full.
"""
return self.qsize() >= self.maxsize
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
If optional arg *block* is true and *timeout* is ``None`` (the default),
block if necessary until a free slot is available. If *timeout* is
a positive number, it blocks at most *timeout* seconds and raises
the :class:`Full` exception if no free slot was available within that time.
Otherwise (*block* is false), put an item on the queue if a free slot
is immediately available, else raise the :class:`Full` exception (*timeout*
is ignored in that case).
"""
if self.maxsize is None or self.qsize() < self.maxsize:
# there's a free slot, put an item right away
self._put(item)
if self.getters:
self._schedule_unlock()
elif not block and get_hub().greenlet is getcurrent():
# we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
# find a getter and deliver an item to it
while self.getters:
getter = self.getters.pop()
if getter:
self._put(item)
item = self._get()
getter.switch(item)
return
raise Full
elif block:
waiter = ItemWaiter(item)
self.putters.add(waiter)
timeout = exc_after(timeout, Full)
try:
if self.getters:
self._schedule_unlock()
result = waiter.wait()
assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
if waiter.item is not _NONE:
self._put(item)
finally:
timeout.cancel()
self.putters.discard(waiter)
else:
raise Full
def put_nowait(self, item):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the :class:`Full` exception.
"""
self.put(item, False)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args *block* is true and *timeout* is ``None`` (the default),
block if necessary until an item is available. If *timeout* is a positive number,
it blocks at most *timeout* seconds and raises the :class:`Empty` exception
if no item was available within that time. Otherwise (*block* is false), return
an item if one is immediately available, else raise the :class:`Empty` exception
(*timeout* is ignored in that case).
"""
if self.qsize():
if self.putters:
self._schedule_unlock()
return self._get()
elif not block and get_hub().greenlet is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
putter = self.putters.pop()
if putter:
putter.switch(putter)
if self.qsize():
return self._get()
raise Empty
elif block:
waiter = Waiter()
timeout = exc_after(timeout, Empty)
try:
self.getters.add(waiter)
if self.putters:
self._schedule_unlock()
return waiter.wait()
finally:
self.getters.discard(waiter)
timeout.cancel()
else:
raise Empty
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the :class:`Empty` exception.
"""
return self.get(False)
def _unlock(self):
try:
while True:
if self.qsize() and self.getters:
getter = self.getters.pop()
if getter:
try:
item = self._get()
except:
getter.throw(*sys.exc_info())
else:
getter.switch(item)
elif self.putters and self.getters:
putter = self.putters.pop()
if putter:
getter = self.getters.pop()
if getter:
item = putter.item
putter.item = _NONE # this makes greenlet calling put() not to call _put() again
self._put(item)
item = self._get()
getter.switch(item)
putter.switch(putter)
else:
self.putters.add(putter)
elif self.putters and (self.getters or self.qsize() < self.maxsize):
putter = self.putters.pop()
putter.switch(putter)
else:
break
finally:
self._event_unlock = None # QQQ maybe it's possible to obtain this info from libevent?
# i.e. whether this event is pending _OR_ currently executing
# testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
# to avoid this, schedule unlock with timer(0, ...) once in a while
def _schedule_unlock(self):
if self._event_unlock is None:
self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
# QQQ re-activate event (with event_active libevent call) instead of creating a new one each time
class ItemWaiter(Waiter):
__slots__ = ['item']
def __init__(self, item):
Waiter.__init__(self)
self.item = item
class PriorityQueue(Queue):
'''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: ``(priority number, data)``.
'''
def _init(self, maxsize):
self.queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
'''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
class JoinableQueue(Queue):
'''A subclass of :class:`Queue` that additionally has :meth:`task_done` and :meth:`join` methods.'''
def __init__(self, maxsize=None):
Queue.__init__(self, maxsize)
self.unfinished_tasks = 0
self._cond = Event()
def _format(self):
result = Queue._format(self)
if self.unfinished_tasks:
result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
return result
def _put(self, item):
Queue._put(self, item)
self.unfinished_tasks += 1
if self._cond.ready():
self._cond.reset()
def task_done(self):
'''Indicate that a formerly enqueued task is complete. Used by queue consumer threads.
For each :meth:`get <Queue.get>` used to fetch a task, a subsequent call to :meth:`task_done` tells the queue
that the processing on the task is complete.
If a :meth:`join` is currently blocking, it will resume when all items have been processed
(meaning that a :meth:`task_done` call was received for every item that had been
:meth:`put <Queue.put>` into the queue).
Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
'''
if self.unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self.unfinished_tasks -= 1
if self.unfinished_tasks == 0:
self._cond.send(None)
def join(self):
'''Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue.
The count goes down whenever a consumer thread calls :meth:`task_done` to indicate
that the item was retrieved and all work on it is complete. When the count of
unfinished tasks drops to zero, :meth:`join` unblocks.
'''
self._cond.wait()

View File

@@ -1,4 +1,4 @@
from eventlet.api import getcurrent
from eventlet.support import greenlets as greenlet
from eventlet.hubs import get_hub
""" If true, captures a stack trace for each timer when constructed. This is
@@ -74,7 +74,7 @@ class Timer(object):
class LocalTimer(Timer):
def __init__(self, *args, **kwargs):
self.greenlet = getcurrent()
self.greenlet = greenlet.getcurrent()
Timer.__init__(self, *args, **kwargs)
@property

View File

@@ -21,6 +21,8 @@ from Queue import Empty, Queue
from eventlet import api, coros, greenio
__all__ = ['execute', 'Proxy', 'killall']
QUIET=True
_rfile = _wfile = None
@@ -90,8 +92,14 @@ def erecv(e):
def execute(meth,*args, **kwargs):
"""
Execute *meth* in a thread, blocking the current coroutine until the method
completes.
Execute *meth* in a Python thread, blocking the current coroutine/
greenthread until the method completes.
The primary use case for this is to wrap an object or module that is not
amenable to monkeypatching or any of the other tricks that Eventlet uses
to achieve cooperative yielding. With tpool, you can force such objects to
cooperate with green threads by sticking them in native threads, at the cost
of some overhead.
"""
setup()
e = esend(meth,*args,**kwargs)
@@ -103,9 +111,13 @@ def proxy_call(autowrap, f, *args, **kwargs):
"""
Call a function *f* and returns the value. If the type of the return value
is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
object before return. Normally *f* will be called nonblocking with the
execute method; if the keyword argument "nonblocking" is set to ``True``,
it will simply be executed directly.
object before return.
Normally *f* will be called in the threadpool with :func:`execute`; if the
keyword argument "nonblocking" is set to ``True``, it will simply be
executed directly. This is useful if you have an object which has methods
that don't need to be called in a separate thread, but which return objects
that should be Proxy wrapped.
"""
if kwargs.pop('nonblocking',False):
rv = f(*args, **kwargs)
@@ -118,11 +130,17 @@ def proxy_call(autowrap, f, *args, **kwargs):
class Proxy(object):
"""
a simple proxy-wrapper of any object that comes with a methods-only
interface, in order to forward every method invocation onto a thread in the
native-thread pool. A key restriction is that the object's methods cannot
call into eventlets, since the eventlet dispatcher runs on a different
native thread. This is for running native-threaded code only.
A simple proxy-wrapper of any object, in order to forward every method
invocation onto a thread in the native-thread pool. A key restriction is
that the object's methods cannot use Eventlet primitives without great care,
since the Eventlet dispatcher runs on a different native thread.
Construct the Proxy with the instance that you want proxied. The optional
parameter *autowrap* is used when methods are called on the proxied object.
If a method on the proxied object returns something whose type is in
*autowrap*, then that object gets a Proxy wrapped around it, too. An
example use case for this is ensuring that DB-API connection objects
return cursor objects that are also Proxy-wrapped.
"""
def __init__(self, obj,autowrap=()):
self._obj = obj

View File

@@ -8,7 +8,8 @@ from twisted.python import failure
from eventlet import proc
from eventlet.api import getcurrent
from eventlet.coros import Queue, Event
from eventlet.coros import Queue
from eventlet.greenthread import Event
class ValueQueue(Queue):

View File

@@ -15,7 +15,9 @@ DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024
DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1'
MAX_REQUEST_LINE = 8192
MINIMUM_CHUNK_SIZE = 4096
DEFAULT_LOG_FORMAT='%(client_ip)s - - [%(date_time)s] "%(request_line)s" %(status_code)s %(body_length)s %(wall_seconds).6f'
__all__ = ['server', 'format_date_time']
# Weekday and month names for HTTP date/time formatting; always English!
_weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
@@ -24,6 +26,7 @@ _monthname = [None, # Dummy so we can use 1-based month numbers
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
def format_date_time(timestamp):
"""Formats a unix timestamp into an HTTP standard string."""
year, month, day, hh, mm, ss, wd, y, z = time.gmtime(timestamp)
return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
_weekdayname[wd], day, _monthname[month], year, hh, mm, ss
@@ -328,13 +331,13 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
pass
finish = time.time()
self.server.log_message('%s - - [%s] "%s" %s %s %.6f' % (
self.get_client_ip(),
self.log_date_time_string(),
self.requestline,
status_code[0],
length[0],
finish - start))
self.server.log_message(self.server.log_format % dict(
client_ip=self.get_client_ip(),
date_time=self.log_date_time_string(),
request_line=self.requestline,
status_code=status_code[0],
body_length=length[0],
wall_seconds=finish - start))
def get_client_ip(self):
client_ip = self.client_address[0]
@@ -413,8 +416,9 @@ class Server(BaseHTTPServer.HTTPServer):
max_http_version=None,
protocol=HttpProtocol,
minimum_chunk_size=None,
log_x_forwarded_for=True,
keepalive=True,
log_x_forwarded_for=True):
log_format=DEFAULT_LOG_FORMAT):
self.outstanding_requests = 0
self.socket = socket
@@ -432,6 +436,7 @@ class Server(BaseHTTPServer.HTTPServer):
if minimum_chunk_size is not None:
protocol.minimum_chunk_size = minimum_chunk_size
self.log_x_forwarded_for = log_x_forwarded_for
self.log_format = log_format
def get_environ(self):
socket = self.socket
@@ -465,24 +470,36 @@ def server(sock, site,
server_event=None,
minimum_chunk_size=None,
log_x_forwarded_for=True,
custom_pool=None,
keepalive=True,
custom_pool=None):
""" Start up a wsgi server handling requests from the supplied server socket.
This function loops forever. The *sock* object will be closed after server exits,
log_format=DEFAULT_LOG_FORMAT):
""" Start up a wsgi server handling requests from the supplied server
socket. This function loops forever. The *sock* object will be closed after server exits,
but the underlying file descriptor will remain open, so if you have a dup() of *sock*,
it will remain usable.
:param sock: Server socket, must be already bound to a port and listening.
:param site: WSGI application function.
:param log: File-like object that logs should be written to. If not specified, sys.stderr is used.
:param environ: Additional parameters that go into the environ dictionary of every request.
:param max_size: Maximum number of client connections opened at any time by this server.
:param max_http_version: Set to "HTTP/1.0" to make the server pretend it only supports HTTP 1.0. The primary reason to do this is to prevent clients from keeping connections open with keepalives.
:param protocol: Protocol class. Deprecated.
:param server_event: Used to collect the Server object. Deprecated.
:param minimum_chunk_size: Minimum size in bytes for http chunks. This can be used to improve performance of applications which yield many small strings, though using it technically violates the WSGI spec.
:param log_x_forwarded_for: If True (the default), logs the contents of the x-forwarded-for header in addition to the actual client ip address in the 'client_ip' field of the log line.
:param custom_pool: A custom Pool instance which is used to spawn client green threads. If this is supplied, max_size is ignored.
:param log_format: A python format string that is used as the template to generate log lines. The following values can be formatted into it: client_ip, date_time, request_line, status_code, body_length, wall_seconds. Look the default for an example of how to use this.
"""
serv = Server(sock, sock.getsockname(),
site, log,
environ=None,
max_http_version=max_http_version,
protocol=protocol,
minimum_chunk_size=minimum_chunk_size,
log_x_forwarded_for=log_x_forwarded_for,
keepalive=keepalive,
log_x_forwarded_for=log_x_forwarded_for)
log_format=log_format)
if server_event is not None:
server_event.send(serv)
if max_size is None:

View File

@@ -10,24 +10,30 @@ You terminate your connection by terminating telnet (typically Ctrl-]
and then 'quit')
"""
from eventlet import api
import eventlet
from eventlet.green import socket
def handle_socket(reader, writer):
def handle(reader, writer):
print "client connected"
while True:
# pass through every non-eof line
x = reader.readline()
if not x: break
writer.write(x)
print "echoed", x
writer.flush()
print "echoed", x,
print "client disconnected"
print "server socket listening on port 6000"
server = api.tcp_listener(('0.0.0.0', 6000))
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 6000))
server.listen(50)
pool = eventlet.GreenPool(10000)
while True:
try:
new_sock, address = server.accept()
except KeyboardInterrupt:
print "accepted", address
pool.spawn_n(handle, new_sock.makefile('r'), new_sock.makefile('w'))
except (SystemExit, KeyboardInterrupt):
break
# handle every new connection with a new coroutine
api.spawn(handle_socket, new_sock.makefile('r'), new_sock.makefile('w'))

View File

@@ -2,32 +2,27 @@
"""\
@file webcrawler.py
This is a simple web "crawler" that fetches a bunch of urls using a coroutine pool. It fetches as
many urls at time as coroutines in the pool.
This is a simple web "crawler" that fetches a bunch of urls using a pool to
control the number of outbound connections. It has as many simultaneously open
connections as coroutines in the pool.
The prints in the body of the fetch function are there to demonstrate that the
requests are truly made in parallel.
"""
urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif",
"http://eventlet.net"]
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
import time
import eventlet
from eventlet.green import urllib2
from eventlet import coros
def fetch(url):
# we could do something interesting with the result, but this is
# example code, so we'll just report that we did it
print "%s fetching %s" % (time.asctime(), url)
req = urllib2.urlopen(url)
print "%s fetched %s (%s)" % (time.asctime(), url, len(req.read()))
pool = coros.CoroutinePool(max_size=4)
waiters = []
for url in urls:
waiters.append(pool.execute(fetch, url))
# wait for all the coroutines to come back before exiting the process
for waiter in waiters:
waiter.wait()
print "opening", url
body = urllib2.urlopen(url).read()
print "done with", url
return url, body
pool = eventlet.GreenPool(200)
for url, body in pool.imap(fetch, urls):
print "got body from", url, "of length", len(body)

View File

@@ -5,8 +5,8 @@ multiple threads, and graceful code reloading, see:
http://pypi.python.org/pypi/Spawning/
"""
from eventlet import api, wsgi
from eventlet import wsgi
from eventlet.green import socket
def hello_world(env, start_response):
if env['PATH_INFO'] != '/':
@@ -15,6 +15,8 @@ def hello_world(env, start_response):
start_response('200 OK', [('Content-Type', 'text/plain')])
return ['Hello, World!\r\n']
sock = socket.socket()
sock.bind(('', 8090))
sock.listen(500)
wsgi.server(api.tcp_listener(('', 8080)), hello_world)
wsgi.server(sock, hello_world)

View File

@@ -106,20 +106,6 @@ class LimitedTestCase(unittest.TestCase):
self.timer.cancel()
class SilencedTestCase(LimitedTestCase):
""" Subclass of LimitedTestCase that also silences the printing of timer
exceptions."""
def setUp(self):
from eventlet import hubs
super(SilencedTestCase, self).setUp()
hubs.get_hub().silent_timer_exceptions = True
def tearDown(self):
from eventlet import hubs
super(SilencedTestCase, self).tearDown()
hubs.get_hub().silent_timer_exceptions = False
def find_command(command):
for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
p = os.path.join(dir, command)

29
tests/backdoor_test.py Normal file
View File

@@ -0,0 +1,29 @@
import eventlet
from eventlet import backdoor
from eventlet.green import socket
from tests import LimitedTestCase, main
class BackdoorTest(LimitedTestCase):
def test_server(self):
listener = socket.socket()
listener.bind(('localhost', 0))
listener.listen(50)
serv = eventlet.spawn(backdoor.backdoor_server, listener)
client = socket.socket()
client.connect(('localhost', listener.getsockname()[1]))
f = client.makefile()
self.assert_('Python' in f.readline())
f.readline() # build info
f.readline() # help info
self.assert_('InteractiveConsole' in f.readline())
self.assertEquals('>>> ', f.read(4))
f.write('print "hi"\n')
f.flush()
self.assertEquals('hi\n', f.readline())
self.assertEquals('>>> ', f.read(4))
if __name__ == '__main__':
main()

View File

@@ -1,8 +1,8 @@
from unittest import main, TestCase
from tests import SilencedTestCase
from tests import LimitedTestCase
from eventlet import coros, api
class TestEvent(SilencedTestCase):
class TestEvent(LimitedTestCase):
def test_waiting_for_event(self):
evt = coros.Event()
value = 'some stuff'
@@ -74,7 +74,7 @@ class IncrActor(coros.Actor):
if evt: evt.send()
class TestActor(SilencedTestCase):
class TestActor(LimitedTestCase):
mode = 'static'
def setUp(self):
super(TestActor, self).setUp()

View File

@@ -1,6 +1,6 @@
from tests import skipped, LimitedTestCase, skip_with_pyevent, TestIsTakingTooLong
from unittest import main
from eventlet import api, util, coros, proc, greenio
from eventlet import api, util, coros, proc, greenio, hubs
from eventlet.green.socket import GreenSSLObject
import errno
import os
@@ -47,11 +47,15 @@ class TestGreenIo(LimitedTestCase):
# by closing the socket prior to using the made file
try:
conn, addr = listener.accept()
fd = conn.makeGreenFile()
fd = conn.makefile()
conn.close()
fd.write('hello\n')
fd.close()
self.assertRaises(socket.error, fd.write, 'a')
# socket._fileobjects are odd: writes don't check
# whether the socket is closed or not, and you get an
# AttributeError during flush if it is closed
fd.write('a')
self.assertRaises(Exception, fd.flush)
self.assertRaises(socket.error, conn.send, 'b')
finally:
listener.close()
@@ -61,19 +65,20 @@ class TestGreenIo(LimitedTestCase):
# by closing the made file and then sending a character
try:
conn, addr = listener.accept()
fd = conn.makeGreenFile()
fd = conn.makefile()
fd.write('hello')
fd.close()
conn.send('\n')
conn.close()
self.assertRaises(socket.error, fd.write, 'a')
fd.write('a')
self.assertRaises(Exception, fd.flush)
self.assertRaises(socket.error, conn.send, 'b')
finally:
listener.close()
def did_it_work(server):
client = api.connect_tcp(('127.0.0.1', server.getsockname()[1]))
fd = client.makeGreenFile()
fd = client.makefile()
client.close()
assert fd.readline() == 'hello\n'
assert fd.read() == ''
@@ -96,16 +101,17 @@ class TestGreenIo(LimitedTestCase):
# closing the file object should close everything
try:
conn, addr = listener.accept()
conn = conn.makeGreenFile()
conn = conn.makefile()
conn.write('hello\n')
conn.close()
self.assertRaises(socket.error, conn.write, 'a')
conn.write('a')
self.assertRaises(Exception, conn.flush)
finally:
listener.close()
server = api.tcp_listener(('0.0.0.0', 0))
killer = coros.execute(accept_once, server)
client = api.connect_tcp(('127.0.0.1', server.getsockname()[1]))
fd = client.makeGreenFile()
fd = client.makefile()
client.close()
assert fd.read() == 'hello\n'
assert fd.read() == ''
@@ -242,6 +248,8 @@ class TestGreenIo(LimitedTestCase):
ssl_sock = ssl.wrap_socket(sock)
def test_exception_squelching(self):
return # exception squelching disabled for now (greenthread doesn't
# re-raise exceptions to the hub)
server = api.tcp_listener(('0.0.0.0', 0))
client = api.connect_tcp(('127.0.0.1', server.getsockname()[1]))
client_2, addr = server.accept()
@@ -260,7 +268,8 @@ class TestGreenIo(LimitedTestCase):
api.sleep(0)
finally:
sys.stderr = orig
self.assert_('Traceback' in fake.getvalue())
self.assert_('Traceback' in fake.getvalue(),
"Traceback not in:\n" + fake.getvalue())
if __name__ == '__main__':
main()

444
tests/greenpool_test.py Normal file
View File

@@ -0,0 +1,444 @@
import gc
import itertools
import os
import random
import eventlet
from eventlet import api
from eventlet import hubs, greenpool, coros, greenthread
import tests
class Spawn(tests.LimitedTestCase):
# TODO: move this test elsewhere
def test_simple(self):
def f(a, b=None):
return (a,b)
gt = eventlet.spawn(f, 1, b=2)
self.assertEquals(gt.wait(), (1,2))
def passthru(a):
eventlet.sleep(0.01)
return a
def passthru2(a, b):
eventlet.sleep(0.01)
return a,b
class GreenPool(tests.LimitedTestCase):
def test_spawn(self):
p = greenpool.GreenPool(4)
waiters = []
for i in xrange(10):
waiters.append(p.spawn(passthru, i))
results = [waiter.wait() for waiter in waiters]
self.assertEquals(results, list(xrange(10)))
def test_spawn_n(self):
p = greenpool.GreenPool(4)
results_closure = []
def do_something(a):
eventlet.sleep(0.01)
results_closure.append(a)
for i in xrange(10):
p.spawn(do_something, i)
p.waitall()
self.assertEquals(results_closure, range(10))
def test_waiting(self):
pool = greenpool.GreenPool(1)
done = greenthread.Event()
def consume():
done.wait()
def waiter(pool):
gt = pool.spawn(consume)
gt.wait()
waiters = []
self.assertEqual(pool.running(), 0)
waiters.append(eventlet.spawn(waiter, pool))
eventlet.sleep(0)
self.assertEqual(pool.waiting(), 0)
waiters.append(eventlet.spawn(waiter, pool))
eventlet.sleep(0)
self.assertEqual(pool.waiting(), 1)
waiters.append(eventlet.spawn(waiter, pool))
eventlet.sleep(0)
self.assertEqual(pool.waiting(), 2)
self.assertEqual(pool.running(), 1)
done.send(None)
for w in waiters:
w.wait()
self.assertEqual(pool.waiting(), 0)
self.assertEqual(pool.running(), 0)
def test_multiple_coros(self):
evt = greenthread.Event()
results = []
def producer():
results.append('prod')
evt.send()
def consumer():
results.append('cons1')
evt.wait()
results.append('cons2')
pool = greenpool.GreenPool(2)
done = pool.spawn(consumer)
pool.spawn_n(producer)
done.wait()
self.assertEquals(['cons1', 'prod', 'cons2'], results)
def test_timer_cancel(self):
# this test verifies that local timers are not fired
# outside of the context of the spawn
timer_fired = []
def fire_timer():
timer_fired.append(True)
def some_work():
hubs.get_hub().schedule_call_local(0, fire_timer)
pool = greenpool.GreenPool(2)
worker = pool.spawn(some_work)
worker.wait()
eventlet.sleep(0)
eventlet.sleep(0)
self.assertEquals(timer_fired, [])
def test_reentrant(self):
pool = greenpool.GreenPool(1)
def reenter():
waiter = pool.spawn(lambda a: a, 'reenter')
self.assertEqual('reenter', waiter.wait())
outer_waiter = pool.spawn(reenter)
outer_waiter.wait()
evt = greenthread.Event()
def reenter_async():
pool.spawn_n(lambda a: a, 'reenter')
evt.send('done')
pool.spawn_n(reenter_async)
self.assertEquals('done', evt.wait())
def assert_pool_has_free(self, pool, num_free):
def wait_long_time(e):
e.wait()
timer = api.exc_after(1, api.TimeoutError)
try:
evt = greenthread.Event()
for x in xrange(num_free):
pool.spawn(wait_long_time, evt)
# if the pool has fewer free than we expect,
# then we'll hit the timeout error
finally:
timer.cancel()
# if the runtime error is not raised it means the pool had
# some unexpected free items
timer = api.exc_after(0, RuntimeError)
try:
self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt)
finally:
timer.cancel()
# clean up by causing all the wait_long_time functions to return
evt.send(None)
eventlet.sleep(0)
eventlet.sleep(0)
def test_resize(self):
pool = greenpool.GreenPool(2)
evt = greenthread.Event()
def wait_long_time(e):
e.wait()
pool.spawn(wait_long_time, evt)
pool.spawn(wait_long_time, evt)
self.assertEquals(pool.free(), 0)
self.assertEquals(pool.running(), 2)
self.assert_pool_has_free(pool, 0)
# verify that the pool discards excess items put into it
pool.resize(1)
# cause the wait_long_time functions to return, which will
# trigger puts to the pool
evt.send(None)
eventlet.sleep(0)
eventlet.sleep(0)
self.assertEquals(pool.free(), 1)
self.assertEquals(pool.running(), 0)
self.assert_pool_has_free(pool, 1)
# resize larger and assert that there are more free items
pool.resize(2)
self.assertEquals(pool.free(), 2)
self.assertEquals(pool.running(), 0)
self.assert_pool_has_free(pool, 2)
def test_pool_smash(self):
# The premise is that a coroutine in a Pool tries to get a token out
# of a token pool but times out before getting the token. We verify
# that neither pool is adversely affected by this situation.
from eventlet import pools
pool = greenpool.GreenPool(1)
tp = pools.TokenPool(max_size=1)
token = tp.get() # empty out the pool
def do_receive(tp):
timer = api.exc_after(0, RuntimeError())
try:
t = tp.get()
self.fail("Shouldn't have recieved anything from the pool")
except RuntimeError:
return 'timed out'
else:
timer.cancel()
# the spawn makes the token pool expect that coroutine, but then
# immediately cuts bait
e1 = pool.spawn(do_receive, tp)
self.assertEquals(e1.wait(), 'timed out')
# the pool can get some random item back
def send_wakeup(tp):
tp.put('wakeup')
gt = eventlet.spawn(send_wakeup, tp)
# now we ask the pool to run something else, which should not
# be affected by the previous send at all
def resume():
return 'resumed'
e2 = pool.spawn(resume)
self.assertEquals(e2.wait(), 'resumed')
# we should be able to get out the thing we put in there, too
self.assertEquals(tp.get(), 'wakeup')
gt.wait()
def test_spawn_n_2(self):
p = greenpool.GreenPool(2)
self.assertEqual(p.free(), 2)
r = []
def foo(a):
r.append(a)
gt = p.spawn(foo, 1)
self.assertEqual(p.free(), 1)
gt.wait()
self.assertEqual(r, [1])
eventlet.sleep(0)
self.assertEqual(p.free(), 2)
#Once the pool is exhausted, spawning forces a yield.
p.spawn_n(foo, 2)
self.assertEqual(1, p.free())
self.assertEqual(r, [1])
p.spawn_n(foo, 3)
self.assertEqual(0, p.free())
self.assertEqual(r, [1])
p.spawn_n(foo, 4)
self.assertEqual(set(r), set([1,2,3]))
eventlet.sleep(0)
self.assertEqual(set(r), set([1,2,3,4]))
def test_imap(self):
p = greenpool.GreenPool(4)
result_list = list(p.imap(passthru, xrange(10)))
self.assertEquals(result_list, list(xrange(10)))
def test_empty_imap(self):
p = greenpool.GreenPool(4)
result_iter = p.imap(passthru, [])
self.assertRaises(StopIteration, result_iter.next)
def test_imap_nonefunc(self):
p = greenpool.GreenPool(4)
result_list = list(p.imap(None, xrange(10)))
self.assertEquals(result_list, [(x,) for x in xrange(10)])
def test_imap_multi_args(self):
p = greenpool.GreenPool(4)
result_list = list(p.imap(passthru2, xrange(10), xrange(10, 20)))
self.assertEquals(result_list, list(itertools.izip(xrange(10), xrange(10,20))))
def test_imap_raises(self):
# testing the case where the function raises an exception;
# both that the caller sees that exception, and that the iterator
# continues to be usable to get the rest of the items
p = greenpool.GreenPool(4)
def raiser(item):
if item == 1 or item == 7:
raise RuntimeError("intentional error")
else:
return item
it = p.imap(raiser, xrange(10))
results = []
while True:
try:
results.append(it.next())
except RuntimeError:
results.append('r')
except StopIteration:
break
self.assertEquals(results, [0,'r',2,3,4,5,6,'r',8,9])
class GreenPile(tests.LimitedTestCase):
def test_pile(self):
p = greenpool.GreenPile(4)
for i in xrange(10):
p.spawn(passthru, i)
result_list = list(p)
self.assertEquals(result_list, list(xrange(10)))
def test_pile_spawn_times_out(self):
p = greenpool.GreenPile(4)
for i in xrange(4):
p.spawn(passthru, i)
# now it should be full and this should time out
api.exc_after(0, api.TimeoutError)
self.assertRaises(api.TimeoutError, p.spawn, passthru, "time out")
# verify that the spawn breakage didn't interrupt the sequence
# and terminates properly
for i in xrange(4,10):
p.spawn(passthru, i)
self.assertEquals(list(p), list(xrange(10)))
def test_constructing_from_pool(self):
pool = greenpool.GreenPool(2)
pile1 = greenpool.GreenPile(pool)
pile2 = greenpool.GreenPile(pool)
def bunch_of_work(pile, unique):
for i in xrange(10):
pile.spawn(passthru, i + unique)
eventlet.spawn(bunch_of_work, pile1, 0)
eventlet.spawn(bunch_of_work, pile2, 100)
eventlet.sleep(0)
self.assertEquals(list(pile2), list(xrange(100,110)))
self.assertEquals(list(pile1), list(xrange(10)))
class StressException(Exception):
pass
r = random.Random(0)
def pressure(arg):
while r.random() < 0.5:
eventlet.sleep(r.random() * 0.001)
if r.random() < 0.8:
return arg
else:
raise StressException(arg)
def passthru(arg):
while r.random() < 0.5:
eventlet.sleep(r.random() * 0.001)
return arg
class Stress(tests.LimitedTestCase):
# tests will take extra-long
TEST_TIMEOUT=10
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def spawn_order_check(self, concurrency):
# checks that piles are strictly ordered
p = greenpool.GreenPile(concurrency)
def makework(count, unique):
for i in xrange(count):
token = (unique, i)
p.spawn(pressure, token)
iters = 1000
eventlet.spawn(makework, iters, 1)
eventlet.spawn(makework, iters, 2)
eventlet.spawn(makework, iters, 3)
p.spawn(pressure, (0,0))
latest = [-1] * 4
received = 0
it = iter(p)
while True:
try:
i = it.next()
except StressException, exc:
i = exc[0]
except StopIteration:
break
received += 1
if received % 5 == 0:
api.sleep(0.0001)
unique, order = i
self.assert_(latest[unique] < order)
latest[unique] = order
for l in latest[1:]:
self.assertEquals(l, iters - 1)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_5(self):
self.spawn_order_check(5)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_50(self):
self.spawn_order_check(50)
def imap_memory_check(self, concurrency):
# checks that imap is strictly
# ordered and consumes a constant amount of memory
p = greenpool.GreenPool(concurrency)
count = 1000
it = p.imap(passthru, xrange(count))
latest = -1
while True:
try:
i = it.next()
except StopIteration:
break
if latest == -1:
gc.collect()
initial_obj_count = len(gc.get_objects())
self.assert_(i > latest)
latest = i
if latest % 5 == 0:
api.sleep(0.001)
if latest % 10 == 0:
gc.collect()
objs_created = len(gc.get_objects()) - initial_obj_count
self.assert_(objs_created < 25 * concurrency, objs_created)
# make sure we got to the end
self.assertEquals(latest, count - 1)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_50(self):
self.imap_memory_check(50)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_500(self):
self.imap_memory_check(500)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_with_intpool(self):
from eventlet import pools
class IntPool(pools.Pool):
def create(self):
self.current_integer = getattr(self, 'current_integer', 0) + 1
return self.current_integer
def subtest(intpool_size, pool_size, num_executes):
def run(int_pool):
token = int_pool.get()
eventlet.sleep(0.0001)
int_pool.put(token)
return token
int_pool = IntPool(max_size=intpool_size)
pool = greenpool.GreenPool(pool_size)
for ix in xrange(num_executes):
pool.spawn(run, int_pool)
pool.waitall()
subtest(4, 7, 7)
subtest(50, 75, 100)
for isize in (10, 20, 30, 40, 50):
for psize in (5, 25, 35, 50):
subtest(isize, psize, psize)

View File

@@ -1,4 +1,4 @@
from tests import LimitedTestCase, SilencedTestCase, main
from tests import LimitedTestCase, main
import time
from eventlet import api
from eventlet import hubs
@@ -32,14 +32,16 @@ class TestScheduleCall(LimitedTestCase):
class TestDebug(LimitedTestCase):
def test_debug(self):
hubs.get_hub().debug = True
self.assert_(hubs.get_hub().debug)
hubs.get_hub().debug = False
self.assert_(not hubs.get_hub().debug)
def test_debug_listeners(self):
hubs.get_hub().set_debug_listeners(True)
hubs.get_hub().set_debug_listeners(False)
def test_timer_exceptions(self):
hubs.get_hub().set_timer_exceptions(True)
hubs.get_hub().set_timer_exceptions(False)
class TestExceptionInMainloop(SilencedTestCase):
class TestExceptionInMainloop(LimitedTestCase):
def test_sleep(self):
# even if there was an error in the mainloop, the hub should continue to work
start = time.time()

312
tests/queue_test.py Normal file
View File

@@ -0,0 +1,312 @@
from tests import LimitedTestCase, main
import eventlet
from eventlet import greenthread
def do_bail(q):
eventlet.exc_after(0, RuntimeError())
try:
result = q.get()
return result
except RuntimeError:
return 'timed out'
class TestQueue(LimitedTestCase):
def test_send_first(self):
q = eventlet.Queue()
q.put('hi')
self.assertEquals(q.get(), 'hi')
def test_send_last(self):
q = eventlet.Queue()
def waiter(q):
self.assertEquals(q.get(), 'hi2')
gt = eventlet.spawn(eventlet.with_timeout, 0.1, waiter, q)
eventlet.sleep(0)
eventlet.sleep(0)
q.put('hi2')
gt.wait()
def test_max_size(self):
q = eventlet.Queue(2)
results = []
def putter(q):
q.put('a')
results.append('a')
q.put('b')
results.append('b')
q.put('c')
results.append('c')
gt = eventlet.spawn(putter, q)
eventlet.sleep(0)
self.assertEquals(results, ['a', 'b'])
self.assertEquals(q.get(), 'a')
eventlet.sleep(0)
self.assertEquals(results, ['a', 'b', 'c'])
self.assertEquals(q.get(), 'b')
self.assertEquals(q.get(), 'c')
gt.wait()
def test_zero_max_size(self):
q = eventlet.Queue(0)
def sender(evt, q):
q.put('hi')
evt.send('done')
def receiver(q):
x = q.get()
return x
evt = greenthread.Event()
gt = eventlet.spawn(sender, evt, q)
eventlet.sleep(0)
self.assert_(not evt.ready())
gt2 = eventlet.spawn(receiver, q)
self.assertEquals(gt2.wait(),'hi')
self.assertEquals(evt.wait(),'done')
gt.wait()
def test_multiple_waiters(self):
# tests that multiple waiters get their results back
q = eventlet.Queue()
def waiter(q):
return q.get()
sendings = ['1', '2', '3', '4']
gts = [eventlet.spawn(waiter, q)
for x in sendings]
eventlet.sleep(0.01) # get 'em all waiting
results = set()
def collect_pending_results():
for i, gt in enumerate(gts):
timer = eventlet.exc_after(0.001, eventlet.TimeoutError)
try:
x = gt.wait()
results.add(x)
timer.cancel()
except eventlet.TimeoutError:
pass # no pending result at that event
return len(results)
q.put(sendings[0])
self.assertEquals(collect_pending_results(), 1)
q.put(sendings[1])
self.assertEquals(collect_pending_results(), 2)
q.put(sendings[2])
q.put(sendings[3])
self.assertEquals(collect_pending_results(), 4)
def test_waiters_that_cancel(self):
q = eventlet.Queue()
gt = eventlet.spawn(do_bail, q)
self.assertEquals(gt.wait(), 'timed out')
q.put('hi')
self.assertEquals(q.get(), 'hi')
def test_getting_before_sending(self):
q = eventlet.Queue()
gt = eventlet.spawn(q.put, 'sent')
self.assertEquals(q.get(), 'sent')
gt.wait()
def test_two_waiters_one_dies(self):
def waiter(q):
return q.get()
q = eventlet.Queue()
dying = eventlet.spawn(do_bail, q)
waiting = eventlet.spawn(waiter, q)
eventlet.sleep(0)
q.put('hi')
self.assertEquals(dying.wait(), 'timed out')
self.assertEquals(waiting.wait(), 'hi')
def test_two_bogus_waiters(self):
q = eventlet.Queue()
gt1 = eventlet.spawn(do_bail, q)
gt2 = eventlet.spawn(do_bail, q)
eventlet.sleep(0)
q.put('sent')
self.assertEquals(gt1.wait(), 'timed out')
self.assertEquals(gt2.wait(), 'timed out')
self.assertEquals(q.get(), 'sent')
def test_waiting(self):
return # TODO add this to the new queue
q = eventlet.Queue()
gt1 = eventlet.spawn(q.get)
eventlet.sleep(0)
self.assertEquals(1, q.waiting())
q.put('hi')
eventlet.sleep(0)
self.assertEquals(0, q.waiting())
self.assertEquals('hi', gt1.wait())
self.assertEquals(0, q.waiting())
def test_channel_send(self):
channel = eventlet.Queue(0)
events = []
def another_greenlet():
events.append(channel.get())
events.append(channel.get())
gt = eventlet.spawn(another_greenlet)
events.append('sending')
channel.put('hello')
events.append('sent hello')
channel.put('world')
events.append('sent world')
self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)
def test_channel_wait(self):
channel = eventlet.Queue(0)
events = []
def another_greenlet():
events.append('sending hello')
channel.put('hello')
events.append('sending world')
channel.put('world')
events.append('sent world')
gt = eventlet.spawn(another_greenlet)
events.append('waiting')
events.append(channel.get())
events.append(channel.get())
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
eventlet.sleep(0)
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
def test_channel_waiters(self):
c = eventlet.Queue(0)
w1 = eventlet.spawn(c.get)
w2 = eventlet.spawn(c.get)
w3 = eventlet.spawn(c.get)
eventlet.sleep(0)
# TODO add waiting method to queue
#self.assertEquals(c.waiting(), 3)
s1 = eventlet.spawn(c.put, 1)
s2 = eventlet.spawn(c.put, 2)
s3 = eventlet.spawn(c.put, 3)
eventlet.sleep(0) # this gets all the sends into a waiting state
# TODO add waiting method to queue
#self.assertEquals(c.waiting(), 0)
s1.wait()
s2.wait()
s3.wait()
self.assertEquals(w1.wait(), 1)
self.assertEquals(w2.wait(), 2)
self.assertEquals(w3.wait(), 3)
def test_channel_sender_timing_out(self):
from eventlet import queue
c = eventlet.Queue(0)
self.assertRaises(queue.Full, c.put, "hi", timeout=0.001)
self.assertRaises(queue.Empty, c.get_nowait)
def test_task_done(self):
from eventlet import queue, debug
channel = queue.JoinableQueue(0)
X = object()
gt = eventlet.spawn(channel.put, X)
result = channel.get()
assert result is X, (result, X)
assert channel.unfinished_tasks == 1, channel.unfinished_tasks
channel.task_done()
assert channel.unfinished_tasks == 0, channel.unfinished_tasks
gt.wait()
def store_result(result, func, *args):
try:
result.append(func(*args))
except Exception, exc:
result.append(exc)
class TestNoWait(LimitedTestCase):
def test_put_nowait_simple(self):
from eventlet import hubs,queue
hub = hubs.get_hub()
result = []
q = eventlet.Queue(1)
hub.schedule_call_global(0, store_result, result, q.put_nowait, 2)
hub.schedule_call_global(0, store_result, result, q.put_nowait, 3)
eventlet.sleep(0)
eventlet.sleep(0)
assert len(result)==2, result
assert result[0]==None, result
assert isinstance(result[1], queue.Full), result
def test_get_nowait_simple(self):
from eventlet import hubs,queue
hub = hubs.get_hub()
result = []
q = queue.Queue(1)
q.put(4)
hub.schedule_call_global(0, store_result, result, q.get_nowait)
hub.schedule_call_global(0, store_result, result, q.get_nowait)
eventlet.sleep(0)
assert len(result)==2, result
assert result[0]==4, result
assert isinstance(result[1], queue.Empty), result
# get_nowait must work from the mainloop
def test_get_nowait_unlock(self):
from eventlet import hubs,queue
hub = hubs.get_hub()
result = []
q = queue.Queue(0)
p = eventlet.spawn(q.put, 5)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.get_nowait)
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
assert result == [5], result
# TODO add ready to greenthread
#assert p.ready(), p
assert p.dead, p
assert q.empty(), q
# put_nowait must work from the mainloop
def test_put_nowait_unlock(self):
from eventlet import hubs,queue
hub = hubs.get_hub()
result = []
q = queue.Queue(0)
p = eventlet.spawn(q.get)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
# TODO ready method on greenthread
#assert not p.ready(), p
eventlet.sleep(0)
assert result == [None], result
# TODO ready method
# assert p.ready(), p
assert q.full(), q
assert q.empty(), q
if __name__=='__main__':
main()

View File

@@ -2,14 +2,14 @@ import sys
import unittest
from eventlet.api import sleep, with_timeout
from eventlet import api, proc, coros
from tests import SilencedTestCase, skipped
from tests import LimitedTestCase, skipped
DELAY = 0.01
class ExpectedError(Exception):
pass
class TestLink_Signal(SilencedTestCase):
class TestLink_Signal(LimitedTestCase):
def test_send(self):
s = proc.Source()
@@ -48,7 +48,7 @@ class TestLink_Signal(SilencedTestCase):
self.assertRaises(OSError, s.wait)
class TestProc(SilencedTestCase):
class TestProc(LimitedTestCase):
def test_proc(self):
p = proc.spawn(lambda : 100)
@@ -76,13 +76,13 @@ class TestProc(SilencedTestCase):
self.assertRaises(proc.LinkedCompleted, sleep, 0.1)
class TestCase(SilencedTestCase):
class TestCase(LimitedTestCase):
def link(self, p, listener=None):
getattr(p, self.link_method)(listener)
def tearDown(self):
SilencedTestCase.tearDown(self)
LimitedTestCase.tearDown(self)
self.p.unlink()
def set_links(self, p, first_time, kill_exc_type):
@@ -252,7 +252,7 @@ class TestRaise_link_exception(TestRaise_link):
link_method = 'link_exception'
class TestStuff(SilencedTestCase):
class TestStuff(LimitedTestCase):
def test_wait_noerrors(self):
x = proc.spawn(lambda : 1)

View File

@@ -86,7 +86,7 @@ class ConnectionClosed(Exception):
def read_http(sock):
fd = sock.makeGreenFile()
fd = sock.makefile()
try:
response_line = fd.readline()
except socket.error, exc:
@@ -95,11 +95,19 @@ def read_http(sock):
raise
if not response_line:
raise ConnectionClosed
raw_headers = fd.readuntil('\r\n\r\n').strip()
#print "R", response_line, raw_headers
header_lines = []
while True:
line = fd.readline()
if line == '\r\n':
break
else:
header_lines.append(line)
headers = dict()
for x in raw_headers.split('\r\n'):
#print "X", x
for x in header_lines:
x = x.strip()
if not x:
continue
key, value = x.split(': ', 1)
assert key.lower() not in headers, "%s header duplicated" % key
headers[key.lower()] = value
@@ -114,35 +122,49 @@ def read_http(sock):
return response_line, headers, body
class HttpdTestCase(LimitedTestCase):
class TestHttpd(LimitedTestCase):
mode = 'static'
keepalive = True
def setUp(self):
super(HttpdTestCase, self).setUp()
super(TestHttpd, self).setUp()
self.logfile = StringIO()
self.site = Site()
listener = api.tcp_listener(('localhost', 0))
self.port = listener.getsockname()[1]
self.killer = api.spawn(
wsgi.server,
listener,
self.site,
max_size=128,
log=self.logfile,
keepalive=self.keepalive)
self.killer = None
self.spawn_server()
def tearDown(self):
super(HttpdTestCase, self).tearDown()
super(TestHttpd, self).tearDown()
api.kill(self.killer)
api.sleep(0)
class TestHttpd(HttpdTestCase):
def spawn_server(self, **kwargs):
"""Spawns a new wsgi server with the given arguments.
Sets self.port to the port of the server, and self.killer is the greenlet
running it.
Kills any previously-running server."""
if self.killer:
api.kill(self.killer)
new_kwargs = dict(max_size=128,
log=self.logfile,
site=self.site)
new_kwargs.update(kwargs)
if 'sock' not in new_kwargs:
new_kwargs['sock'] = api.tcp_listener(('localhost', 0))
self.port = new_kwargs['sock'].getsockname()[1]
self.killer = api.spawn(
wsgi.server,
**new_kwargs)
def test_001_server(self):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
fd.flush()
result = fd.read()
fd.close()
## The server responds with the maximum version it supports
@@ -153,10 +175,12 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
read_http(sock)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
read_http(sock)
fd.close()
@@ -165,8 +189,9 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
cancel = api.exc_after(1, RuntimeError)
self.assertRaises(TypeError, fd.read, "This shouldn't work")
cancel.cancel()
@@ -176,12 +201,15 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
read_http(sock)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
read_http(sock)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
self.assertRaises(ConnectionClosed, read_http, sock)
fd.close()
@@ -201,8 +229,9 @@ class TestHttpd(HttpdTestCase):
path_parts.append('path')
path = '/'.join(path_parts)
request = 'GET /%s HTTP/1.0\r\nHost: localhost\r\n\r\n' % path
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write(request)
fd.flush()
result = fd.readline()
if result:
# windows closes the socket before the data is flushed,
@@ -227,8 +256,9 @@ class TestHttpd(HttpdTestCase):
'Content-Length: 3',
'',
'a=a'))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write(request)
fd.flush()
# send some junk after the actual request
fd.write('01234567890123456789')
@@ -240,12 +270,15 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
response_line_200,_,_ = read_http(sock)
fd.write('GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
response_line_404,_,_ = read_http(sock)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.flush()
response_line_test,_,_ = read_http(sock)
self.assertEqual(response_line_200,response_line_test)
fd.close()
@@ -255,8 +288,9 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
self.assert_('Transfer-Encoding: chunked' in fd.read())
def test_010_no_chunked_http_1_0(self):
@@ -264,8 +298,9 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
self.assert_('Transfer-Encoding: chunked' not in fd.read())
def test_011_multiple_chunks(self):
@@ -273,9 +308,16 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
headers = fd.readuntil('\r\n\r\n')
fd.flush()
headers = ''
while True:
line = fd.readline()
if line == '\r\n':
break
else:
headers += line
self.assert_('Transfer-Encoding: chunked' in headers)
chunks = 0
chunklen = int(fd.readline(), 16)
@@ -295,10 +337,9 @@ class TestHttpd(HttpdTestCase):
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file)
self.spawn_server(sock=server_sock, site=wsgi_app)
api.spawn(wsgi.server, server_sock, wsgi_app, log=StringIO())
sock = api.connect_tcp(('localhost', server_sock.getsockname()[1]))
sock = api.connect_tcp(('localhost', self.port))
sock = util.wrap_ssl(sock)
sock.write('POST /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nContent-length:3\r\n\r\nabc')
result = sock.read(8192)
@@ -312,7 +353,7 @@ class TestHttpd(HttpdTestCase):
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file)
api.spawn(wsgi.server, server_sock, wsgi_app, log=StringIO())
self.spawn_server(sock=server_sock, site=wsgi_app)
sock = api.connect_tcp(('localhost', server_sock.getsockname()[1]))
sock = util.wrap_ssl(sock)
@@ -323,43 +364,54 @@ class TestHttpd(HttpdTestCase):
def test_014_chunked_post(self):
self.site.application = chunked_post
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('PUT /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
fd.readuntil('\r\n\r\n')
fd.flush()
while True:
if fd.readline() == '\r\n':
break
response = fd.read()
self.assert_(response == 'oh hai', 'invalid response %s' % response)
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('PUT /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
fd.readuntil('\r\n\r\n')
fd.flush()
while True:
if fd.readline() == '\r\n':
break
response = fd.read()
self.assert_(response == 'oh hai', 'invalid response %s' % response)
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('PUT /c HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
fd.readuntil('\r\n\r\n')
fd.flush()
while True:
if fd.readline() == '\r\n':
break
response = fd.read(8192)
self.assert_(response == 'oh hai', 'invalid response %s' % response)
def test_015_write(self):
self.site.application = use_write
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
response_line, headers, body = read_http(sock)
self.assert_('content-length' in headers)
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
response_line, headers, body = read_http(sock)
self.assert_('transfer-encoding' in headers)
self.assert_(headers['transfer-encoding'] == 'chunked')
@@ -374,10 +426,17 @@ class TestHttpd(HttpdTestCase):
return ['testing']
self.site.application = wsgi_app
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
headerlines = fd.readuntil('\r\n\r\n').splitlines()
self.assertEquals(1, len([l for l in headerlines
fd.flush()
header_lines = []
while True:
line = fd.readline()
if line == '\r\n':
break
else:
header_lines.append(line)
self.assertEquals(1, len([l for l in header_lines
if l.lower().startswith('content-length')]))
def test_017_ssl_zeroreturnerror(self):
@@ -420,14 +479,19 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
self.assert_('connection: keep-alive' in
fd.readuntil('\r\n\r\n').lower())
fd.flush()
response_line, headers, body = read_http(sock)
self.assert_('connection' in headers)
self.assertEqual('keep-alive', headers['connection'])
# repeat request to verify connection is actually still open
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
self.assert_('connection: keep-alive' in
fd.readuntil('\r\n\r\n').lower())
fd.flush()
response_line, headers, body = read_http(sock)
self.assert_('connection' in headers)
self.assertEqual('keep-alive', headers['connection'])
def test_019_fieldstorage_compat(self):
def use_fieldstorage(environ, start_response):
@@ -441,13 +505,14 @@ class TestHttpd(HttpdTestCase):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('POST / HTTP/1.1\r\n'
'Host: localhost\r\n'
'Connection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n'
'4\r\n hai\r\n0\r\n\r\n')
fd.flush()
self.assert_('hello!' in fd.read())
def test_020_x_forwarded_for(self):
@@ -459,16 +524,7 @@ class TestHttpd(HttpdTestCase):
# turning off the option should work too
self.logfile = StringIO()
api.kill(self.killer)
listener = api.tcp_listener(('localhost', 0))
self.port = listener.getsockname()[1]
self.killer = api.spawn(
wsgi.server,
listener,
self.site,
max_size=128,
log=self.logfile,
log_x_forwarded_for=False)
self.spawn_server(log_x_forwarded_for=False)
sock = api.connect_tcp(('localhost', self.port))
sock.sendall('GET / HTTP/1.1\r\nHost: localhost\r\nX-Forwarded-For: 1.2.3.4, 5.6.7.8\r\n\r\n')
@@ -481,15 +537,14 @@ class TestHttpd(HttpdTestCase):
def test_socket_remains_open(self):
api.kill(self.killer)
server_sock = api.tcp_listener(('localhost', 0))
self.port = server_sock.getsockname()[1]
server_sock_2 = server_sock.dup()
self.killer = api.spawn(wsgi.server, server_sock_2, hello_world,
log=self.logfile)
self.spawn_server(sock=server_sock_2)
# do a single req/response to verify it's up
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
result = fd.read()
fd.flush()
result = fd.read(1024)
fd.close()
self.assert_(result.startswith('HTTP'), result)
self.assert_(result.endswith('hello world'))
@@ -497,17 +552,18 @@ class TestHttpd(HttpdTestCase):
# shut down the server and verify the server_socket fd is still open,
# but the actual socketobject passed in to wsgi.server is closed
api.kill(self.killer)
api.sleep(0.01)
api.sleep(0.001) # make the kill go through
try:
server_sock_2.accept()
# shouldn't be able to use this one anymore
except socket.error, exc:
self.assertEqual(exc[0], errno.EBADF)
self.killer = api.spawn(wsgi.server, server_sock, hello_world,
log=self.logfile)
self.spawn_server(sock=server_sock)
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
result = fd.read()
fd.flush()
result = fd.read(1024)
fd.close()
self.assert_(result.startswith('HTTP'), result)
self.assert_(result.endswith('hello world'))
@@ -525,11 +581,12 @@ class TestHttpd(HttpdTestCase):
return []
self.site.application = clobberin_time
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.1\r\n'
'Host: localhost\r\n'
'Connection: close\r\n'
'\r\n\r\n')
fd.flush()
self.assert_('200 OK' in fd.read())
def test_022_custom_pool(self):
@@ -538,22 +595,14 @@ class TestHttpd(HttpdTestCase):
# ensure that all clients finished
from eventlet import pool
p = pool.Pool(max_size=5)
api.kill(self.killer)
listener = api.tcp_listener(('localhost', 0))
self.port = listener.getsockname()[1]
self.killer = api.spawn(
wsgi.server,
listener,
self.site,
max_size=128,
log=self.logfile,
custom_pool=p)
self.spawn_server(custom_pool=p)
# this stuff is copied from test_001_server, could be better factored
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
fd.flush()
result = fd.read()
fd.close()
self.assert_(result.startswith('HTTP'), result)
@@ -562,8 +611,9 @@ class TestHttpd(HttpdTestCase):
def test_023_bad_content_length(self):
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nContent-length: argh\r\n\r\n')
fd.flush()
result = fd.read()
fd.close()
self.assert_(result.startswith('HTTP'), result)
@@ -581,31 +631,41 @@ class TestHttpd(HttpdTestCase):
return [text]
self.site.application = wsgi_app
sock = api.connect_tcp(('localhost', self.port))
fd = sock.makeGreenFile()
fd = sock.makefile()
fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 1025\r\nExpect: 100-continue\r\n\r\n')
result = fd.readuntil('\r\n\r\n')
self.assert_(result.startswith('HTTP/1.1 417 Expectation Failed'))
self.assertEquals(fd.read(7), 'failure')
fd.flush()
response_line, headers, body = read_http(sock)
self.assert_(response_line.startswith('HTTP/1.1 417 Expectation Failed'))
self.assertEquals(body, 'failure')
fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 7\r\nExpect: 100-continue\r\n\r\ntesting')
result = fd.readuntil('\r\n\r\n')
self.assert_(result.startswith('HTTP/1.1 100 Continue'))
result = fd.readuntil('\r\n\r\n')
self.assert_(result.startswith('HTTP/1.1 200 OK'))
fd.flush()
header_lines = []
while True:
line = fd.readline()
if line == '\r\n':
break
else:
header_lines.append(line)
self.assert_(header_lines[0].startswith('HTTP/1.1 100 Continue'))
header_lines = []
while True:
line = fd.readline()
if line == '\r\n':
break
else:
header_lines.append(line)
self.assert_(header_lines[0].startswith('HTTP/1.1 200 OK'))
self.assertEquals(fd.read(7), 'testing')
fd.close()
def test_025_accept_errors(self):
api.kill(self.killer)
from eventlet import debug
debug.hub_exceptions(True)
listener = greensocket.socket()
listener.bind(('localhost', 0))
# NOT calling listen, to trigger the error
self.port = listener.getsockname()[1]
self.killer = api.spawn(
wsgi.server,
listener,
self.site,
max_size=128,
log=self.logfile)
self.logfile = StringIO()
self.spawn_server(sock=listener)
old_stderr = sys.stderr
try:
sys.stderr = self.logfile
@@ -620,6 +680,15 @@ class TestHttpd(HttpdTestCase):
self.logfile.getvalue())
finally:
sys.stderr = old_stderr
debug.hub_exceptions(False)
def test_026_log_format(self):
self.spawn_server(log_format="HI %(request_line)s HI")
sock = api.connect_tcp(('localhost', self.port))
sock.sendall('GET /yo! HTTP/1.1\r\nHost: localhost\r\n\r\n')
sock.recv(1024)
sock.close()
self.assert_('\nHI GET /yo! HTTP/1.1 HI\n' in self.logfile.getvalue(), self.logfile.getvalue())
def test_close_chunked_with_1_0_client(self):
# verify that if we return a generator from our app
@@ -635,19 +704,17 @@ class TestHttpd(HttpdTestCase):
self.assertNotEqual(headers.get('transfer-encoding'), 'chunked')
self.assertEquals(body, "thisischunked")
class KeepAliveTestHttpd(HttpdTestCase):
keepalive = False
def test_026_http_10_nokeepalive(self):
# verify that if an http/1.0 client sends connection: keep-alive
# and the server doesn't accept keep-alives, we close the connection
self.spawn_server(keepalive=False)
sock = api.connect_tcp(
('localhost', self.port))
fd = sock.makeGreenFile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
self.assert_('connection: close' in
fd.readuntil('\r\n\r\n').lower())
sock.sendall('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
response_line, headers, body = read_http(sock)
self.assertEqual(headers['connection'], 'close')
if __name__ == '__main__':
main()