This commit is contained in:
Ryan Williams
2010-02-25 02:45:41 -05:00
23 changed files with 461 additions and 417 deletions

View File

@@ -12,4 +12,7 @@ annotated
cover cover
nosetests*.xml nosetests*.xml
.coverage .coverage
*,cover *,cover
syntax: re
^.ropeproject/.*$

View File

@@ -1,5 +1,4 @@
import collections import collections
import time
import traceback import traceback
import warnings import warnings
@@ -16,34 +15,34 @@ class NOT_USED:
NOT_USED = NOT_USED() NOT_USED = NOT_USED()
def Event(*a, **kw): def Event(*a, **kw):
warnings.warn("The Event class has been moved to the event module! " warnings.warn("The Event class has been moved to the event module! "
"Please construct event.Event objects instead.", "Please construct event.Event objects instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
return _event.Event(*a, **kw) return _event.Event(*a, **kw)
def event(*a, **kw): def event(*a, **kw):
warnings.warn("The event class has been capitalized and moved! Please " warnings.warn("The event class has been capitalized and moved! Please "
"construct event.Event objects instead.", "construct event.Event objects instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
return _event.Event(*a, **kw) return _event.Event(*a, **kw)
def Semaphore(count): def Semaphore(count):
warnings.warn("The Semaphore class has moved! Please " warnings.warn("The Semaphore class has moved! Please "
"use semaphore.Semaphore instead.", "use semaphore.Semaphore instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
return semaphoremod.Semaphore(count) return semaphoremod.Semaphore(count)
def BoundedSemaphore(count): def BoundedSemaphore(count):
warnings.warn("The BoundedSemaphore class has moved! Please " warnings.warn("The BoundedSemaphore class has moved! Please "
"use semaphore.BoundedSemaphore instead.", "use semaphore.BoundedSemaphore instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
return semaphoremod.BoundedSemaphore(count) return semaphoremod.BoundedSemaphore(count)
def semaphore(count=0, limit=None): def semaphore(count=0, limit=None):
warnings.warn("coros.semaphore is deprecated. Please use either " warnings.warn("coros.semaphore is deprecated. Please use either "
"semaphore.Semaphore or semaphore.BoundedSemaphore instead.", "semaphore.Semaphore or semaphore.BoundedSemaphore instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
if limit is None: if limit is None:
return Semaphore(count) return Semaphore(count)
@@ -136,7 +135,7 @@ class Queue(object):
def __init__(self): def __init__(self):
warnings.warn("coros.Queue is deprecated. Please use " warnings.warn("coros.Queue is deprecated. Please use "
"eventlet.queue.Queue instead.", "eventlet.queue.Queue instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
self.items = collections.deque() self.items = collections.deque()
self._waiters = set() self._waiters = set()
@@ -148,7 +147,8 @@ class Queue(object):
return len(self.items) return len(self.items)
def __repr__(self): def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), len(self.items), len(self._waiters)) params = (self.__class__.__name__, hex(id(self)),
len(self.items), len(self._waiters))
return '<%s at %s items[%d] _waiters[%s]>' % params return '<%s at %s items[%d] _waiters[%s]>' % params
def send(self, result=None, exc=None): def send(self, result=None, exc=None):
@@ -195,10 +195,10 @@ class Queue(object):
def waiting(self): def waiting(self):
return len(self._waiters) return len(self._waiters)
def __iter__(self): def __iter__(self):
return self return self
def next(self): def next(self):
return self.wait() return self.wait()
@@ -207,7 +207,7 @@ class Channel(object):
def __init__(self, max_size=0): def __init__(self, max_size=0):
warnings.warn("coros.Channel is deprecated. Please use " warnings.warn("coros.Channel is deprecated. Please use "
"eventlet.queue.Queue(0) instead.", "eventlet.queue.Queue(0) instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
self.max_size = max_size self.max_size = max_size
self.items = collections.deque() self.items = collections.deque()
@@ -221,7 +221,9 @@ class Channel(object):
return len(self.items) return len(self.items)
def __repr__(self): def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.max_size, len(self.items), len(self._waiters), len(self._senders)) params = (self.__class__.__name__, hex(id(self)),
self.max_size, len(self.items),
len(self._waiters), len(self._senders))
return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params
def send(self, result=None, exc=None): def send(self, result=None, exc=None):
@@ -323,7 +325,7 @@ class Actor(object):
to process concurrently. If it is 1, the actor will process messages to process concurrently. If it is 1, the actor will process messages
serially. serially.
""" """
warnings.warn("We're phasing out the Actor class, so as to get rid of" warnings.warn("We're phasing out the Actor class, so as to get rid of"
"the coros module. If you use Actor, please speak up on " "the coros module. If you use Actor, please speak up on "
"eventletdev@lists.secondlife.com, and we'll come up with a " "eventletdev@lists.secondlife.com, and we'll come up with a "
"transition plan. If no one speaks up, we'll remove Actor " "transition plan. If no one speaks up, we'll remove Actor "
@@ -397,4 +399,3 @@ class Actor(object):
>>> eventlet.kill(a._killer) # test cleanup >>> eventlet.kill(a._killer) # test cleanup
""" """
raise NotImplementedError() raise NotImplementedError()

View File

@@ -4,6 +4,7 @@ from eventlet.green import SimpleHTTPServer
from eventlet.green import urllib from eventlet.green import urllib
from eventlet.green import select from eventlet.green import select
test = None # bind prior to patcher.inject to silence pyflakes warning below
patcher.inject('CGIHTTPServer', patcher.inject('CGIHTTPServer',
globals(), globals(),
('BaseHTTPServer', BaseHTTPServer), ('BaseHTTPServer', BaseHTTPServer),
@@ -14,4 +15,4 @@ patcher.inject('CGIHTTPServer',
del patcher del patcher
if __name__ == '__main__': if __name__ == '__main__':
test() test() # pyflakes false alarm here unless test = None above

View File

@@ -11,13 +11,13 @@ import os
import sys import sys
import warnings import warnings
__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection', __patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection',
'ssl', 'socket'] 'ssl', 'socket']
__original_fromfd__ = __socket.fromfd __original_fromfd__ = __socket.fromfd
def fromfd(*args): def fromfd(*args):
return socket(__original_fromfd__(*args)) return socket(__original_fromfd__(*args))
__original_socketpair__ = __socket.socketpair __original_socketpair__ = __socket.socketpair
def socketpair(*args): def socketpair(*args):
one, two = __original_socketpair__(*args) one, two = __original_socketpair__(*args)
@@ -35,7 +35,7 @@ def gethostbyname(name):
globals()['gethostbyname'] = __original_gethostbyname__ globals()['gethostbyname'] = __original_gethostbyname__
else: else:
globals()['gethostbyname'] = _gethostbyname_tpool globals()['gethostbyname'] = _gethostbyname_tpool
return globals()['gethostbyname'](name) return globals()['gethostbyname'](name)
def _gethostbyname_twisted(name): def _gethostbyname_twisted(name):
@@ -51,7 +51,7 @@ def _gethostbyname_tpool(name):
# def getaddrinfo(*args, **kw): # def getaddrinfo(*args, **kw):
# return tpool.execute( # return tpool.execute(
# __socket.getaddrinfo, *args, **kw) # __socket.getaddrinfo, *args, **kw)
# #
# XXX there're few more blocking functions in socket # XXX there're few more blocking functions in socket
# XXX having a hub-independent way to access thread pool would be nice # XXX having a hub-independent way to access thread pool would be nice
@@ -88,10 +88,10 @@ def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT):
def _convert_to_sslerror(ex): def _convert_to_sslerror(ex):
""" Transliterates SSL.SysCallErrors to socket.sslerrors""" """ Transliterates SSL.SysCallErrors to socket.sslerrors"""
return sslerror((ex[0], ex[1])) return sslerror((ex[0], ex[1]))
class GreenSSLObject(object): class GreenSSLObject(object):
""" Wrapper object around the SSLObjects returned by socket.ssl, which have a """ Wrapper object around the SSLObjects returned by socket.ssl, which have a
slightly different interface from SSL.Connection objects. """ slightly different interface from SSL.Connection objects. """
def __init__(self, green_ssl_obj): def __init__(self, green_ssl_obj):
""" Should only be called by a 'green' socket.ssl """ """ Should only be called by a 'green' socket.ssl """
@@ -106,7 +106,7 @@ class GreenSSLObject(object):
self.connection.do_handshake() self.connection.do_handshake()
except _SSL.SysCallError, e: except _SSL.SysCallError, e:
raise _convert_to_sslerror(e) raise _convert_to_sslerror(e)
def read(self, n=1024): def read(self, n=1024):
"""If n is provided, read n bytes from the SSL connection, otherwise read """If n is provided, read n bytes from the SSL connection, otherwise read
until EOF. The return value is a string of the bytes read.""" until EOF. The return value is a string of the bytes read."""
@@ -116,9 +116,9 @@ class GreenSSLObject(object):
return '' return ''
except _SSL.SysCallError, e: except _SSL.SysCallError, e:
raise _convert_to_sslerror(e) raise _convert_to_sslerror(e)
def write(self, s): def write(self, s):
"""Writes the string s to the on the object's SSL connection. """Writes the string s to the on the object's SSL connection.
The return value is the number of bytes written. """ The return value is the number of bytes written. """
try: try:
return self.connection.write(s) return self.connection.write(s)
@@ -130,13 +130,13 @@ class GreenSSLObject(object):
purposes; do not parse the content of this string because its format can't be purposes; do not parse the content of this string because its format can't be
parsed unambiguously. """ parsed unambiguously. """
return str(self.connection.get_peer_certificate().get_subject()) return str(self.connection.get_peer_certificate().get_subject())
def issuer(self): def issuer(self):
"""Returns a string describing the issuer of the server's certificate. Useful """Returns a string describing the issuer of the server's certificate. Useful
for debugging purposes; do not parse the content of this string because its for debugging purposes; do not parse the content of this string because its
format can't be parsed unambiguously.""" format can't be parsed unambiguously."""
return str(self.connection.get_peer_certificate().get_issuer()) return str(self.connection.get_peer_certificate().get_issuer())
try: try:
try: try:

View File

@@ -2,7 +2,7 @@ from eventlet import patcher
from eventlet.green import thread from eventlet.green import thread
from eventlet.green import time from eventlet.green import time
__patched__ = ['_start_new_thread', '_allocate_lock', '_get_ident', '_sleep', __patched__ = ['_start_new_thread', '_allocate_lock', '_get_ident', '_sleep',
'local', 'stack_size'] 'local', 'stack_size']
patcher.inject('threading', patcher.inject('threading',
@@ -21,6 +21,3 @@ def _patch_main_thread(mod):
curthread = mod._active.pop(mod._get_ident(), None) curthread = mod._active.pop(mod._get_ident(), None)
if curthread: if curthread:
mod._active[thread.get_ident()] = curthread mod._active[thread.get_ident()] = curthread
if __name__ == '__main__':
_test()

View File

@@ -3,3 +3,4 @@ for var in dir(__time):
exec "%s = __time.%s" % (var, var) exec "%s = __time.%s" % (var, var)
__patched__ = ['sleep'] __patched__ = ['sleep']
from eventlet.greenthread import sleep from eventlet.greenthread import sleep
sleep # silence pyflakes

View File

@@ -1,7 +1,4 @@
import eventlet
from eventlet.hubs import trampoline from eventlet.hubs import trampoline
from eventlet.hubs import get_hub
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
import errno import errno
@@ -12,10 +9,6 @@ import sys
import time import time
import warnings import warnings
from errno import EWOULDBLOCK, EAGAIN
__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe'] __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)) CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
@@ -36,8 +29,8 @@ def socket_connect(descriptor, address):
def socket_accept(descriptor): def socket_accept(descriptor):
""" """
Attempts to accept() on the descriptor, returns a client,address tuple Attempts to accept() on the descriptor, returns a client,address tuple
if it succeeds; returns None if it needs to trampoline, and raises if it succeeds; returns None if it needs to trampoline, and raises
any exceptions. any exceptions.
""" """
try: try:
@@ -46,7 +39,7 @@ def socket_accept(descriptor):
if e[0] == errno.EWOULDBLOCK: if e[0] == errno.EWOULDBLOCK:
return None return None
raise raise
if sys.platform[:3]=="win": if sys.platform[:3]=="win":
# winsock sometimes throws ENOTCONN # winsock sometimes throws ENOTCONN
@@ -62,7 +55,7 @@ else:
def set_nonblocking(fd): def set_nonblocking(fd):
""" """
Sets the descriptor to be nonblocking. Works on many file-like Sets the descriptor to be nonblocking. Works on many file-like
objects as well as sockets. Only sockets can be nonblocking on objects as well as sockets. Only sockets can be nonblocking on
Windows, however. Windows, however.
""" """
try: try:
@@ -96,7 +89,7 @@ try:
from socket import _GLOBAL_DEFAULT_TIMEOUT from socket import _GLOBAL_DEFAULT_TIMEOUT
except ImportError: except ImportError:
_GLOBAL_DEFAULT_TIMEOUT = object() _GLOBAL_DEFAULT_TIMEOUT = object()
class GreenSocket(object): class GreenSocket(object):
""" """
@@ -117,14 +110,14 @@ class GreenSocket(object):
self.timeout = fd.gettimeout() or socket.getdefaulttimeout() self.timeout = fd.gettimeout() or socket.getdefaulttimeout()
except AttributeError: except AttributeError:
self.timeout = socket.getdefaulttimeout() self.timeout = socket.getdefaulttimeout()
set_nonblocking(fd) set_nonblocking(fd)
self.fd = fd self.fd = fd
self.closed = False self.closed = False
# when client calls setblocking(0) or settimeout(0) the socket must # when client calls setblocking(0) or settimeout(0) the socket must
# act non-blocking # act non-blocking
self.act_non_blocking = False self.act_non_blocking = False
@property @property
def _sock(self): def _sock(self):
return self return self
@@ -195,11 +188,11 @@ class GreenSocket(object):
else: else:
end = time.time() + self.gettimeout() end = time.time() + self.gettimeout()
while True: while True:
if socket_connect(fd, address):
return 0
if time.time() >= end:
raise socket.timeout(errno.EAGAIN)
try: try:
if socket_connect(fd, address):
return 0
if time.time() >= end:
raise socket.timeout(errno.EAGAIN)
trampoline(fd, write=True, timeout=end-time.time(), trampoline(fd, write=True, timeout=end-time.time(),
timeout_exc=socket.timeout(errno.EAGAIN)) timeout_exc=socket.timeout(errno.EAGAIN))
except socket.error, ex: except socket.error, ex:
@@ -254,9 +247,9 @@ class GreenSocket(object):
return '' return ''
else: else:
raise raise
trampoline(fd, trampoline(fd,
read=True, read=True,
timeout=self.timeout, timeout=self.timeout,
timeout_exc=socket.timeout("timed out")) timeout_exc=socket.timeout("timed out"))
def recvfrom(self, *args): def recvfrom(self, *args):
@@ -302,7 +295,6 @@ class GreenSocket(object):
return total_sent return total_sent
def sendall(self, data, flags=0): def sendall(self, data, flags=0):
fd = self.fd
tail = self.send(data, flags) tail = self.send(data, flags)
len_data = len(data) len_data = len(data)
while tail < len_data: while tail < len_data:
@@ -357,11 +349,11 @@ class GreenPipe(object):
self.fd = fd self.fd = fd
self.closed = False self.closed = False
self.recvbuffer = '' self.recvbuffer = ''
def close(self): def close(self):
self.fd.close() self.fd.close()
self.closed = True self.closed = True
def fileno(self): def fileno(self):
return self.fd.fileno() return self.fd.fileno()
@@ -375,7 +367,7 @@ class GreenPipe(object):
try: try:
return fd.read(buflen) return fd.read(buflen)
except IOError, e: except IOError, e:
if e[0] != EAGAIN: if e[0] != errno.EAGAIN:
return '' return ''
except socket.error, e: except socket.error, e:
if e[0] == errno.EPIPE: if e[0] == errno.EPIPE:
@@ -407,7 +399,7 @@ class GreenPipe(object):
fd.flush() fd.flush()
return len(data) return len(data)
except IOError, e: except IOError, e:
if e[0] != EAGAIN: if e[0] != errno.EAGAIN:
raise raise
except ValueError, e: except ValueError, e:
# what's this for? # what's this for?
@@ -419,7 +411,7 @@ class GreenPipe(object):
def flush(self): def flush(self):
pass pass
def readuntil(self, terminator, size=None): def readuntil(self, terminator, size=None):
buf, self.recvbuffer = self.recvbuffer, '' buf, self.recvbuffer = self.recvbuffer, ''
checked = 0 checked = 0
@@ -449,7 +441,7 @@ class GreenPipe(object):
buf += d buf += d
chunk, self.recvbuffer = buf[:size], buf[size:] chunk, self.recvbuffer = buf[:size], buf[size:]
return chunk return chunk
def readline(self, size=None): def readline(self, size=None):
return self.readuntil(self.newlines, size=size) return self.readuntil(self.newlines, size=size)
@@ -484,24 +476,24 @@ except ImportError:
class SSL(object): class SSL(object):
class WantWriteError(object): class WantWriteError(object):
pass pass
class WantReadError(object): class WantReadError(object):
pass pass
class ZeroReturnError(object): class ZeroReturnError(object):
pass pass
class SysCallError(object): class SysCallError(object):
pass pass
def shutdown_safe(sock): def shutdown_safe(sock):
""" Shuts down the socket. This is a convenience method for """ Shuts down the socket. This is a convenience method for
code that wants to gracefully handle regular sockets, SSL.Connection code that wants to gracefully handle regular sockets, SSL.Connection
sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6 sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
interchangeably. Both types of ssl socket require a shutdown() before interchangeably. Both types of ssl socket require a shutdown() before
close, but they have different arity on their shutdown method. close, but they have different arity on their shutdown method.
Regular sockets don't need a shutdown before close, but it doesn't hurt. Regular sockets don't need a shutdown before close, but it doesn't hurt.
""" """
try: try:
@@ -516,11 +508,11 @@ def shutdown_safe(sock):
# this will often be the case in an http server context # this will often be the case in an http server context
if e[0] != errno.ENOTCONN: if e[0] != errno.ENOTCONN:
raise raise
def connect(addr, family=socket.AF_INET, bind=None): def connect(addr, family=socket.AF_INET, bind=None):
"""Convenience function for opening client sockets. """Convenience function for opening client sockets.
:param addr: Address of the server to connect to. For TCP sockets, this is a (host, port) tuple. :param addr: Address of the server to connect to. For TCP sockets, this is a (host, port) tuple.
:param family: Socket family, optional. See :mod:`socket` documentation for available families. :param family: Socket family, optional. See :mod:`socket` documentation for available families.
:param bind: Local address to bind to, optional. :param bind: Local address to bind to, optional.
@@ -531,14 +523,14 @@ def connect(addr, family=socket.AF_INET, bind=None):
sock.bind(bind) sock.bind(bind)
sock.connect(addr) sock.connect(addr)
return sock return sock
def listen(addr, family=socket.AF_INET, backlog=50): def listen(addr, family=socket.AF_INET, backlog=50):
"""Convenience function for opening server sockets. This """Convenience function for opening server sockets. This
socket can be used in an ``accept()`` loop. socket can be used in an ``accept()`` loop.
Sets SO_REUSEADDR on the socket to save on annoyance. Sets SO_REUSEADDR on the socket to save on annoyance.
:param addr: Address to listen on. For TCP sockets, this is a (host, port) tuple. :param addr: Address to listen on. For TCP sockets, this is a (host, port) tuple.
:param family: Socket family, optional. See :mod:`socket` documentation for available families. :param family: Socket family, optional. See :mod:`socket` documentation for available families.
:param backlog: The maximum number of queued connections. Should be at least 1; the maximum value is system-dependent. :param backlog: The maximum number of queued connections. Should be at least 1; the maximum value is system-dependent.
@@ -552,41 +544,40 @@ def listen(addr, family=socket.AF_INET, backlog=50):
def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False,
cert_reqs=None, ssl_version=None, ca_certs=None, cert_reqs=None, ssl_version=None, ca_certs=None,
do_handshake_on_connect=True, suppress_ragged_eofs=True): do_handshake_on_connect=True, suppress_ragged_eofs=True):
"""Convenience function for converting a regular socket into an SSL """Convenience function for converting a regular socket into an SSL
socket. Has the same interface as :func:`ssl.wrap_socket`, but socket. Has the same interface as :func:`ssl.wrap_socket`, but
works on 2.5 or earlier, using PyOpenSSL. works on 2.5 or earlier, using PyOpenSSL.
The preferred idiom is to call wrap_ssl directly on the creation The preferred idiom is to call wrap_ssl directly on the creation
method, e.g., ``wrap_ssl(connect(addr))`` or method, e.g., ``wrap_ssl(connect(addr))`` or
``wrap_ssl(listen(addr), server_side=True)``. This way there is ``wrap_ssl(listen(addr), server_side=True)``. This way there is
no "naked" socket sitting around to accidentally corrupt the SSL no "naked" socket sitting around to accidentally corrupt the SSL
session. session.
:return Green SSL object. :return Green SSL object.
""" """
pass pass
def serve(sock, handle, concurrency=1000): def serve(sock, handle, concurrency=1000):
"""Runs a server on the supplied socket. Calls the function """Runs a server on the supplied socket. Calls the function
*handle* in a separate greenthread for every incoming request. *handle* in a separate greenthread for every incoming request.
This function blocks the calling greenthread; it won't return until This function blocks the calling greenthread; it won't return until
the server completes. If you desire an immediate return, the server completes. If you desire an immediate return,
spawn a new greenthread for :func:`serve`. spawn a new greenthread for :func:`serve`.
The *handle* function must raise an EndServerException to The *handle* function must raise an EndServerException to
gracefully terminate the server -- that's the only way to get the gracefully terminate the server -- that's the only way to get the
server() function to return. Any other uncaught exceptions raised server() function to return. Any other uncaught exceptions raised
in *handle* are raised as exceptions from :func:`serve`, so be in *handle* are raised as exceptions from :func:`serve`, so be
sure to do a good job catching exceptions that your application sure to do a good job catching exceptions that your application
raises. The return value of *handle* is ignored. raises. The return value of *handle* is ignored.
The value in *concurrency* controls the maximum number of The value in *concurrency* controls the maximum number of
greenthreads that will be open at any time handling requests. When greenthreads that will be open at any time handling requests. When
the server hits the concurrency limit, it stops accepting new the server hits the concurrency limit, it stops accepting new
connections until the existing ones complete. connections until the existing ones complete.
""" """
pass pass

View File

@@ -10,7 +10,7 @@ from eventlet.support import greenlets as greenlet
__all__ = ['GreenPool', 'GreenPile'] __all__ = ['GreenPool', 'GreenPile']
DEBUG = False DEBUG = False
try: try:
next next
except NameError: except NameError:
@@ -28,20 +28,20 @@ class GreenPool(object):
self.coroutines_running = set() self.coroutines_running = set()
self.sem = semaphore.Semaphore(size) self.sem = semaphore.Semaphore(size)
self.no_coros_running = event.Event() self.no_coros_running = event.Event()
def resize(self, new_size): def resize(self, new_size):
""" Change the max number of greenthreads doing work at any given time. """ Change the max number of greenthreads doing work at any given time.
If resize is called when there are more than *new_size* greenthreads If resize is called when there are more than *new_size* greenthreads
already working on tasks, they will be allowed to complete but no new already working on tasks, they will be allowed to complete but no new
tasks will be allowed to get launched until enough greenthreads finish tasks will be allowed to get launched until enough greenthreads finish
their tasks to drop the overall quantity below *new_size*. Until their tasks to drop the overall quantity below *new_size*. Until
then, the return value of free() will be negative. then, the return value of free() will be negative.
""" """
size_delta = new_size - self.size size_delta = new_size - self.size
self.sem.counter += size_delta self.sem.counter += size_delta
self.size = new_size self.size = new_size
def running(self): def running(self):
""" Returns the number of greenthreads that are currently executing """ Returns the number of greenthreads that are currently executing
functions in the Parallel's pool.""" functions in the Parallel's pool."""
@@ -49,20 +49,20 @@ class GreenPool(object):
def free(self): def free(self):
""" Returns the number of greenthreads available for use. """ Returns the number of greenthreads available for use.
If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
block the calling greenthread until a slot becomes available.""" block the calling greenthread until a slot becomes available."""
return self.sem.counter return self.sem.counter
def spawn(self, function, *args, **kwargs): def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread. """Run the *function* with its arguments in its own green thread.
Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>` Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
object that is running the function, which can be used to retrieve the object that is running the function, which can be used to retrieve the
results. results.
If the pool is currently at capacity, ``spawn`` will block until one of If the pool is currently at capacity, ``spawn`` will block until one of
the running greenthreads completes its task and frees up a slot. the running greenthreads completes its task and frees up a slot.
This function is reentrant; *function* can call ``spawn`` on the same This function is reentrant; *function* can call ``spawn`` on the same
pool without risk of deadlocking the whole thing. pool without risk of deadlocking the whole thing.
""" """
@@ -82,7 +82,7 @@ class GreenPool(object):
self.coroutines_running.add(gt) self.coroutines_running.add(gt)
gt.link(self._spawn_done) gt.link(self._spawn_done)
return gt return gt
def _spawn_n_impl(self, func, args, kwargs, coro): def _spawn_n_impl(self, func, args, kwargs, coro):
try: try:
try: try:
@@ -98,10 +98,10 @@ class GreenPool(object):
else: else:
coro = greenthread.getcurrent() coro = greenthread.getcurrent()
self._spawn_done(coro) self._spawn_done(coro)
def spawn_n(self, function, *args, **kwargs): def spawn_n(self, function, *args, **kwargs):
""" Create a greenthread to run the *function*, the same as """ Create a greenthread to run the *function*, the same as
:meth:`spawn`. The difference is that :meth:`spawn_n` returns :meth:`spawn`. The difference is that :meth:`spawn_n` returns
None; the results of *function* are not retrievable. None; the results of *function* are not retrievable.
""" """
# if reentering an empty pool, don't try to wait on a coroutine freeing # if reentering an empty pool, don't try to wait on a coroutine freeing
@@ -111,7 +111,7 @@ class GreenPool(object):
self._spawn_n_impl(function, args, kwargs, None) self._spawn_n_impl(function, args, kwargs, None)
else: else:
self.sem.acquire() self.sem.acquire()
g = greenthread.spawn_n(self._spawn_n_impl, g = greenthread.spawn_n(self._spawn_n_impl,
function, args, kwargs, True) function, args, kwargs, True)
if not self.coroutines_running: if not self.coroutines_running:
self.no_coros_running = event.Event() self.no_coros_running = event.Event()
@@ -121,7 +121,7 @@ class GreenPool(object):
"""Waits until all greenthreads in the pool are finished working.""" """Waits until all greenthreads in the pool are finished working."""
if self.running(): if self.running():
self.no_coros_running.wait() self.no_coros_running.wait()
def _spawn_done(self, coro): def _spawn_done(self, coro):
self.sem.release() self.sem.release()
if coro is not None: if coro is not None:
@@ -130,25 +130,25 @@ class GreenPool(object):
# we can finish off any waitall() calls that might be pending # we can finish off any waitall() calls that might be pending
if self.sem.balance == self.size: if self.sem.balance == self.size:
self.no_coros_running.send(None) self.no_coros_running.send(None)
def waiting(self): def waiting(self):
"""Return the number of greenthreads waiting to spawn. """Return the number of greenthreads waiting to spawn.
""" """
if self.sem.balance < 0: if self.sem.balance < 0:
return -self.sem.balance return -self.sem.balance
else: else:
return 0 return 0
def _do_map(self, func, it, gi): def _do_map(self, func, it, gi):
for args in it: for args in it:
gi.spawn(func, *args) gi.spawn(func, *args)
gi.spawn(return_stop_iteration) gi.spawn(return_stop_iteration)
def starmap(self, function, iterable): def starmap(self, function, iterable):
"""This is the same as :func:`itertools.starmap`, except that *func* is """This is the same as :func:`itertools.starmap`, except that *func* is
executed in a separate green thread for each item, with the concurrency executed in a separate green thread for each item, with the concurrency
limited by the pool's size. In operation, starmap consumes a constant limited by the pool's size. In operation, starmap consumes a constant
amount of memory, proportional to the size of the pool, and is thus amount of memory, proportional to the size of the pool, and is thus
suited for iterating over extremely long input lists. suited for iterating over extremely long input lists.
""" """
if function is None: if function is None:
@@ -163,22 +163,22 @@ class GreenPool(object):
""" """
return self.starmap(function, itertools.izip(*iterables)) return self.starmap(function, itertools.izip(*iterables))
def return_stop_iteration(): def return_stop_iteration():
return StopIteration() return StopIteration()
class GreenPile(object): class GreenPile(object):
"""GreenPile is an abstraction representing a bunch of I/O-related tasks. """GreenPile is an abstraction representing a bunch of I/O-related tasks.
Construct a GreenPile with an existing GreenPool object. The GreenPile will Construct a GreenPile with an existing GreenPool object. The GreenPile will
then use that pool's concurrency as it processes its jobs. There can be then use that pool's concurrency as it processes its jobs. There can be
many GreenPiles associated with a single GreenPool. many GreenPiles associated with a single GreenPool.
A GreenPile can also be constructed standalone, not associated with any A GreenPile can also be constructed standalone, not associated with any
GreenPool. To do this, construct it with an integer size parameter instead GreenPool. To do this, construct it with an integer size parameter instead
of a GreenPool. of a GreenPool.
It is not advisable to iterate over a GreenPile in a different greenthread It is not advisable to iterate over a GreenPile in a different greenthread
than the one which is calling spawn. The iterator will exit early in that than the one which is calling spawn. The iterator will exit early in that
situation. situation.
@@ -191,9 +191,9 @@ class GreenPile(object):
self.waiters = queue.LightQueue() self.waiters = queue.LightQueue()
self.used = False self.used = False
self.counter = 0 self.counter = 0
def spawn(self, func, *args, **kw): def spawn(self, func, *args, **kw):
"""Runs *func* in its own green thread, with the result available by """Runs *func* in its own green thread, with the result available by
iterating over the GreenPile object.""" iterating over the GreenPile object."""
self.used = True self.used = True
self.counter += 1 self.counter += 1
@@ -203,10 +203,10 @@ class GreenPile(object):
except: except:
self.counter -= 1 self.counter -= 1
raise raise
def __iter__(self): def __iter__(self):
return self return self
def next(self): def next(self):
"""Wait for the next result, suspending the current greenthread until it """Wait for the next result, suspending the current greenthread until it
is available. Raises StopIteration when there are no more results.""" is available. Raises StopIteration when there are no more results."""
@@ -216,15 +216,15 @@ class GreenPile(object):
return self.waiters.get().wait() return self.waiters.get().wait()
finally: finally:
self.counter -= 1 self.counter -= 1
# this is identical to GreenPile but it blocks on spawn if the results # this is identical to GreenPile but it blocks on spawn if the results
# aren't consumed, and it doesn't generate its own StopIteration exception, # aren't consumed, and it doesn't generate its own StopIteration exception,
# instead relying on the spawning process to send one in when it's done # instead relying on the spawning process to send one in when it's done
class GreenMap(GreenPile): class GreenMap(GreenPile):
def __init__(self, size_or_pool): def __init__(self, size_or_pool):
super(GreenMap, self).__init__(size_or_pool) super(GreenMap, self).__init__(size_or_pool)
self.waiters = queue.LightQueue(maxsize=self.pool.size) self.waiters = queue.LightQueue(maxsize=self.pool.size)
def next(self): def next(self):
try: try:
val = self.waiters.get().wait() val = self.waiters.get().wait()

View File

@@ -21,8 +21,8 @@ class FdListener(object):
def __repr__(self): def __repr__(self):
return "%s(%r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, self.cb) return "%s(%r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, self.cb)
__str__ = __repr__ __str__ = __repr__
# in debug mode, track the call site that created the listener # in debug mode, track the call site that created the listener
class DebugListener(FdListener): class DebugListener(FdListener):
def __init__(self, evtype, fileno, cb): def __init__(self, evtype, fileno, cb):
@@ -30,20 +30,21 @@ class DebugListener(FdListener):
self.greenlet = greenlet.getcurrent() self.greenlet = greenlet.getcurrent()
super(DebugListener, self).__init__(evtype, fileno, cb) super(DebugListener, self).__init__(evtype, fileno, cb)
def __repr__(self): def __repr__(self):
return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % (self.evtype, return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % (
self.fileno, self.evtype,
self.cb, self.fileno,
self.greenlet, self.cb,
''.join(self.where_called)) self.greenlet,
''.join(self.where_called))
__str__ = __repr__ __str__ = __repr__
class BaseHub(object): class BaseHub(object):
""" Base hub class for easing the implementation of subclasses that are """ Base hub class for easing the implementation of subclasses that are
specific to a particular underlying event architecture. """ specific to a particular underlying event architecture. """
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
READ = READ READ = READ
WRITE = WRITE WRITE = WRITE
@@ -58,7 +59,7 @@ class BaseHub(object):
self.next_timers = [] self.next_timers = []
self.lclass = FdListener self.lclass = FdListener
self.debug_exceptions = True self.debug_exceptions = True
def add(self, evtype, fileno, cb): def add(self, evtype, fileno, cb):
""" Signals an intent to or write a particular file descriptor. """ Signals an intent to or write a particular file descriptor.
@@ -81,9 +82,9 @@ class BaseHub(object):
pass pass
if listener_list: if listener_list:
self.listeners[listener.evtype][listener.fileno] = listener_list self.listeners[listener.evtype][listener.fileno] = listener_list
def remove_descriptor(self, fileno): def remove_descriptor(self, fileno):
""" Completely remove all listeners for this fileno. For internal use """ Completely remove all listeners for this fileno. For internal use
only.""" only."""
self.listeners[READ].pop(fileno, None) self.listeners[READ].pop(fileno, None)
self.listeners[WRITE].pop(fileno, None) self.listeners[WRITE].pop(fileno, None)
@@ -106,7 +107,7 @@ class BaseHub(object):
if self.greenlet.dead: if self.greenlet.dead:
self.greenlet = greenlet.greenlet(self.run) self.greenlet = greenlet.greenlet(self.run)
try: try:
if self.greenlet.parent is not cur: if self.greenlet.parent is not cur:
cur.parent = self.greenlet cur.parent = self.greenlet
except ValueError: except ValueError:
pass # gets raised if there is a greenlet parent cycle pass # gets raised if there is a greenlet parent cycle
@@ -231,8 +232,6 @@ class BaseHub(object):
t = self.timers t = self.timers
heappop = heapq.heappop heappop = heapq.heappop
i = 0
while t: while t:
next = t[0] next = t[0]
@@ -265,12 +264,12 @@ class BaseHub(object):
def get_timers_count(hub): def get_timers_count(hub):
return max(len(hub.timers), len(hub.next_timers)) return max(len(hub.timers), len(hub.next_timers))
def set_debug_listeners(self, value): def set_debug_listeners(self, value):
if value: if value:
self.lclass = DebugListener self.lclass = DebugListener
else: else:
self.lclass = FdListener self.lclass = FdListener
def set_timer_exceptions(self, value): def set_timer_exceptions(self, value):
self.debug_exceptions = value self.debug_exceptions = value

View File

@@ -29,7 +29,7 @@ class event_wrapper(object):
if self.impl is not None: if self.impl is not None:
self.impl.delete() self.impl.delete()
self.impl = None self.impl = None
@property @property
def pending(self): def pending(self):
return bool(self.impl and self.impl.pending()) return bool(self.impl and self.impl.pending())
@@ -41,9 +41,11 @@ class Hub(BaseHub):
def __init__(self): def __init__(self):
super(Hub,self).__init__() super(Hub,self).__init__()
event.init() event.init()
self.signal_exc_info = None self.signal_exc_info = None
self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt)) self.signal(
2,
lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt))
self.events_to_add = [] self.events_to_add = []
def dispatch(self): def dispatch(self):
@@ -76,7 +78,8 @@ class Hub(BaseHub):
raise raise
except: except:
if self.signal_exc_info is not None: if self.signal_exc_info is not None:
self.schedule_call_global(0, greenlet.getcurrent().parent.throw, *self.signal_exc_info) self.schedule_call_global(
0, greenlet.getcurrent().parent.throw, *self.signal_exc_info)
self.signal_exc_info = None self.signal_exc_info = None
else: else:
self.squelch_timer_exception(None, sys.exc_info()) self.squelch_timer_exception(None, sys.exc_info())
@@ -86,25 +89,25 @@ class Hub(BaseHub):
def _getrunning(self): def _getrunning(self):
return bool(self.greenlet) return bool(self.greenlet)
def _setrunning(self, value): def _setrunning(self, value):
pass # exists for compatibility with BaseHub pass # exists for compatibility with BaseHub
running = property(_getrunning, _setrunning) running = property(_getrunning, _setrunning)
def add(self, evtype, fileno, real_cb): def add(self, evtype, fileno, real_cb):
# this is stupid: pyevent won't call a callback unless it's a function, # this is stupid: pyevent won't call a callback unless it's a function,
# so we have to force it to be one here # so we have to force it to be one here
if isinstance(real_cb, types.BuiltinMethodType): if isinstance(real_cb, types.BuiltinMethodType):
def cb(_d): def cb(_d):
real_cb(_d) real_cb(_d)
else: else:
cb = real_cb cb = real_cb
if evtype is READ: if evtype is READ:
evt = event.read(fileno, cb, fileno) evt = event.read(fileno, cb, fileno)
elif evtype is WRITE: elif evtype is WRITE:
evt = event.write(fileno, cb, fileno) evt = event.write(fileno, cb, fileno)
listener = FdListener(evtype, fileno, evt) listener = FdListener(evtype, fileno, evt)
self.listeners[evtype].setdefault(fileno, []).append(listener) self.listeners[evtype].setdefault(fileno, []).append(listener)
return listener return listener
@@ -117,22 +120,22 @@ class Hub(BaseHub):
self.signal_exc_info = sys.exc_info() self.signal_exc_info = sys.exc_info()
event.abort() event.abort()
return event_wrapper(event.signal(signalnum, wrapper)) return event_wrapper(event.signal(signalnum, wrapper))
def remove(self, listener): def remove(self, listener):
super(Hub, self).remove(listener) super(Hub, self).remove(listener)
listener.cb.delete() listener.cb.delete()
def remove_descriptor(self, fileno): def remove_descriptor(self, fileno):
for lcontainer in self.listeners.itervalues(): for lcontainer in self.listeners.itervalues():
l_list = lcontainer.pop(fileno, None) l_list = lcontainer.pop(fileno, None)
for listener in l_list: for listener in l_list:
try: try:
listener.cb.delete() listener.cb.delete()
except SYSTEM_EXCEPTIONS: except self.SYSTEM_EXCEPTIONS:
raise raise
except: except:
traceback.print_exc() traceback.print_exc()
def schedule_call_local(self, seconds, cb, *args, **kwargs): def schedule_call_local(self, seconds, cb, *args, **kwargs):
current = greenlet.getcurrent() current = greenlet.getcurrent()
if current is self.greenlet: if current is self.greenlet:
@@ -149,7 +152,7 @@ class Hub(BaseHub):
wrapper = event_wrapper(event_impl, seconds=seconds) wrapper = event_wrapper(event_impl, seconds=seconds)
self.events_to_add.append(wrapper) self.events_to_add.append(wrapper)
return wrapper return wrapper
def _version_info(self): def _version_info(self):
baseversion = event.__version__ baseversion = event.__version__
return baseversion return baseversion
@@ -169,4 +172,3 @@ def _scheduled_call_local(event_impl, handle, evtype, arg):
cb(*args, **kwargs) cb(*args, **kwargs)
finally: finally:
event_impl.delete() event_impl.delete()

View File

@@ -49,7 +49,7 @@ class socket_rwdescriptor(FdListener):
super(socket_rwdescriptor, self).__init__(evtype, fileno, cb) super(socket_rwdescriptor, self).__init__(evtype, fileno, cb)
if not isinstance(fileno, (int,long)): if not isinstance(fileno, (int,long)):
raise TypeError("Expected int or long, got %s" % type(fileno)) raise TypeError("Expected int or long, got %s" % type(fileno))
# Twisted expects fileno to be a callable, not an attribute # Twisted expects fileno to be a callable, not an attribute
def _fileno(): def _fileno():
return fileno return fileno
self.fileno = _fileno self.fileno = _fileno
@@ -74,8 +74,8 @@ class socket_rwdescriptor(FdListener):
# to the mainloop occurs, twisted will not re-evaluate the delayed calls # to the mainloop occurs, twisted will not re-evaluate the delayed calls
# because it assumes that none were scheduled since no client code was executed # because it assumes that none were scheduled since no client code was executed
# (it has no idea it was switched away). So, we restart the mainloop. # (it has no idea it was switched away). So, we restart the mainloop.
# XXX this is not enough, pollreactor prints the traceback for this and epollreactor # XXX this is not enough, pollreactor prints the traceback for
# times out. see test__hub.TestCloseSocketWhilePolling # this and epollreactor times out. see test__hub.TestCloseSocketWhilePolling
raise greenlet.GreenletExit raise greenlet.GreenletExit
logstr = "twistedr" logstr = "twistedr"
@@ -95,7 +95,7 @@ class BaseTwistedHub(object):
# XXX: remove me from here. make functions that depend on reactor # XXX: remove me from here. make functions that depend on reactor
# XXX: hub's methods # XXX: hub's methods
uses_twisted_reactor = True uses_twisted_reactor = True
WRITE = WRITE WRITE = WRITE
READ = READ READ = READ
@@ -103,9 +103,10 @@ class BaseTwistedHub(object):
self.greenlet = mainloop_greenlet self.greenlet = mainloop_greenlet
def switch(self): def switch(self):
assert api.getcurrent() is not self.greenlet, "Cannot switch from MAINLOOP to MAINLOOP" assert getcurrent() is not self.greenlet, \
"Cannot switch from MAINLOOP to MAINLOOP"
try: try:
api.getcurrent().parent = self.greenlet getcurrent().parent = self.greenlet
except ValueError: except ValueError:
pass pass
return self.greenlet.switch() return self.greenlet.switch()
@@ -127,14 +128,15 @@ class BaseTwistedHub(object):
from twisted.internet import reactor from twisted.internet import reactor
reactor.removeReader(descriptor) reactor.removeReader(descriptor)
reactor.removeWriter(descriptor) reactor.removeWriter(descriptor)
def schedule_call_local(self, seconds, func, *args, **kwargs): def schedule_call_local(self, seconds, func, *args, **kwargs):
from twisted.internet import reactor from twisted.internet import reactor
def call_if_greenlet_alive(*args1, **kwargs1): def call_if_greenlet_alive(*args1, **kwargs1):
if timer.greenlet.dead: if timer.greenlet.dead:
return return
return func(*args1, **kwargs1) return func(*args1, **kwargs1)
timer = callLater(LocalDelayedCall, reactor, seconds, call_if_greenlet_alive, *args, **kwargs) timer = callLater(LocalDelayedCall, reactor, seconds,
call_if_greenlet_alive, *args, **kwargs)
return timer return timer
schedule_call = schedule_call_local schedule_call = schedule_call_local
@@ -189,18 +191,22 @@ class TwistedHub(BaseTwistedHub):
installSignalHandlers = False installSignalHandlers = False
def __init__(self): def __init__(self):
assert Hub.state==0, ('%s hub can only be instantiated once' % type(self).__name__, Hub.state) assert Hub.state==0, ('%s hub can only be instantiated once'%type(self).__name__,
Hub.state)
Hub.state = 1 Hub.state = 1
make_twisted_threadpool_daemonic() # otherwise the program would hang after the main greenlet exited make_twisted_threadpool_daemonic() # otherwise the program
g = api.Greenlet(self.run) # would hang after the main
# greenlet exited
g = greenlet.greenlet(self.run)
BaseTwistedHub.__init__(self, g) BaseTwistedHub.__init__(self, g)
def switch(self): def switch(self):
assert api.getcurrent() is not self.greenlet, "Cannot switch from MAINLOOP to MAINLOOP" assert getcurrent() is not self.greenlet, \
"Cannot switch from MAINLOOP to MAINLOOP"
if self.greenlet.dead: if self.greenlet.dead:
self.greenlet = api.Greenlet(self.run) self.greenlet = greenlet.greenlet(self.run)
try: try:
api.getcurrent().parent = self.greenlet getcurrent().parent = self.greenlet
except ValueError: except ValueError:
pass pass
return self.greenlet.switch() return self.greenlet.switch()
@@ -255,5 +261,3 @@ def make_twisted_threadpool_daemonic():
from twisted.python.threadpool import ThreadPool from twisted.python.threadpool import ThreadPool
if ThreadPool.threadFactory != DaemonicThread: if ThreadPool.threadFactory != DaemonicThread:
ThreadPool.threadFactory = DaemonicThread ThreadPool.threadFactory = DaemonicThread

View File

@@ -7,17 +7,17 @@ __exclude = set(('__builtins__', '__file__', '__name__'))
def inject(module_name, new_globals, *additional_modules): def inject(module_name, new_globals, *additional_modules):
"""Base method for "injecting" greened modules into an imported module. It """Base method for "injecting" greened modules into an imported module. It
imports the module specified in *module_name*, arranging things so imports the module specified in *module_name*, arranging things so
that the already-imported modules in *additional_modules* are used when that the already-imported modules in *additional_modules* are used when
*module_name* makes its imports. *module_name* makes its imports.
*new_globals* is either None or a globals dictionary that gets populated *new_globals* is either None or a globals dictionary that gets populated
with the contents of the *module_name* module. This is useful when creating with the contents of the *module_name* module. This is useful when creating
a "green" version of some other module. a "green" version of some other module.
*additional_modules* should be a collection of two-element tuples, of the *additional_modules* should be a collection of two-element tuples, of the
form (<name>, <module>). If it's not specified, a default selection of form (<name>, <module>). If it's not specified, a default selection of
name/module pairs is used, which should cover all use cases but may be name/module pairs is used, which should cover all use cases but may be
slower because there are inevitably redundant or unnecessary imports. slower because there are inevitably redundant or unnecessary imports.
""" """
if not additional_modules: if not additional_modules:
@@ -26,16 +26,17 @@ def inject(module_name, new_globals, *additional_modules):
_green_os_modules() + _green_os_modules() +
_green_select_modules() + _green_select_modules() +
_green_socket_modules() + _green_socket_modules() +
_green_thread_modules() + _green_thread_modules() +
_green_time_modules()) _green_time_modules())
## Put the specified modules in sys.modules for the duration of the import ## Put the specified modules in sys.modules for the duration of the import
saved = {} saved = {}
for name, mod in additional_modules: for name, mod in additional_modules:
saved[name] = sys.modules.get(name, None) saved[name] = sys.modules.get(name, None)
sys.modules[name] = mod sys.modules[name] = mod
## Remove the old module from sys.modules and reimport it while the specified modules are in place ## Remove the old module from sys.modules and reimport it while
## the specified modules are in place
old_module = sys.modules.pop(module_name, None) old_module = sys.modules.pop(module_name, None)
try: try:
module = __import__(module_name, {}, {}, module_name.split('.')[:-1]) module = __import__(module_name, {}, {}, module_name.split('.')[:-1])
@@ -66,20 +67,20 @@ def inject(module_name, new_globals, *additional_modules):
def import_patched(module_name, *additional_modules, **kw_additional_modules): def import_patched(module_name, *additional_modules, **kw_additional_modules):
"""Imports a module in a way that ensures that the module uses "green" """Imports a module in a way that ensures that the module uses "green"
versions of the standard library modules, so that everything works versions of the standard library modules, so that everything works
nonblockingly. nonblockingly.
The only required argument is the name of the module to be imported. The only required argument is the name of the module to be imported.
""" """
return inject( return inject(
module_name, module_name,
None, None,
*additional_modules + tuple(kw_additional_modules.items())) *additional_modules + tuple(kw_additional_modules.items()))
def patch_function(func, *additional_modules): def patch_function(func, *additional_modules):
"""Huge hack here -- patches the specified modules for the """Huge hack here -- patches the specified modules for the
duration of the function call.""" duration of the function call."""
if not additional_modules: if not additional_modules:
# supply some defaults # supply some defaults
@@ -105,7 +106,7 @@ def patch_function(func, *additional_modules):
else: else:
del sys.modules[name] del sys.modules[name]
return patched return patched
_originals = {} _originals = {}
def original(modname): def original(modname):
mod = _originals.get(modname) mod = _originals.get(modname)
@@ -122,18 +123,18 @@ def original(modname):
return _originals.get(modname) return _originals.get(modname)
already_patched = {} already_patched = {}
def monkey_patch(all=True, os=False, select=False, def monkey_patch(all=True, os=False, select=False,
socket=False, thread=False, time=False): socket=False, thread=False, time=False):
"""Globally patches certain system modules to be greenthread-friendly. """Globally patches certain system modules to be greenthread-friendly.
The keyword arguments afford some control over which modules are patched. The keyword arguments afford some control over which modules are patched.
If *all* is True, then all modules are patched regardless of the other If *all* is True, then all modules are patched regardless of the other
arguments. If it's False, then the rest of the keyword arguments control arguments. If it's False, then the rest of the keyword arguments control
patching of specific subsections of the standard library. patching of specific subsections of the standard library.
Most patch the single module of the same name (os, time, Most patch the single module of the same name (os, time,
select). The exceptions are socket, which also patches the ssl module if select). The exceptions are socket, which also patches the ssl module if
present; and thread, which patches thread, threading, and Queue. present; and thread, which patches thread, threading, and Queue.
It's safe to call monkey_patch multiple times. It's safe to call monkey_patch multiple times.
""" """
modules_to_patch = [] modules_to_patch = []
@@ -144,7 +145,7 @@ def monkey_patch(all=True, os=False, select=False,
modules_to_patch += _green_select_modules() modules_to_patch += _green_select_modules()
already_patched['select'] = True already_patched['select'] = True
if all or socket and not already_patched.get('socket'): if all or socket and not already_patched.get('socket'):
modules_to_patch += _green_socket_modules() modules_to_patch += _green_socket_modules()
already_patched['socket'] = True already_patched['socket'] = True
if all or thread and not already_patched.get('thread'): if all or thread and not already_patched.get('thread'):
# hacks ahead # hacks ahead
@@ -156,19 +157,20 @@ def monkey_patch(all=True, os=False, select=False,
if all or time and not already_patched.get('time'): if all or time and not already_patched.get('time'):
modules_to_patch += _green_time_modules() modules_to_patch += _green_time_modules()
already_patched['time'] = True already_patched['time'] = True
for name, mod in modules_to_patch: for name, mod in modules_to_patch:
orig_mod = sys.modules.get(name) orig_mod = sys.modules.get(name)
for attr in mod.__patched__: for attr_name in mod.__patched__:
orig_attr = getattr(orig_mod, attr, None) #orig_attr = getattr(orig_mod, attr_name, None)
patched_attr = getattr(mod, attr, None) # @@tavis: line above wasn't used, not sure what author intended
patched_attr = getattr(mod, attr_name, None)
if patched_attr is not None: if patched_attr is not None:
setattr(orig_mod, attr, patched_attr) setattr(orig_mod, attr_name, patched_attr)
def _green_os_modules(): def _green_os_modules():
from eventlet.green import os from eventlet.green import os
return [('os', os)] return [('os', os)]
def _green_select_modules(): def _green_select_modules():
from eventlet.green import select from eventlet.green import select
return [('select', select)] return [('select', select)]
@@ -186,7 +188,7 @@ def _green_thread_modules():
from eventlet.green import thread from eventlet.green import thread
from eventlet.green import threading from eventlet.green import threading
return [('Queue', Queue), ('thread', thread), ('threading', threading)] return [('Queue', Queue), ('thread', thread), ('threading', threading)]
def _green_time_modules(): def _green_time_modules():
from eventlet.green import time from eventlet.green import time
return [('time', time)] return [('time', time)]

View File

@@ -10,7 +10,7 @@ try:
exec(''' exec('''
@contextmanager @contextmanager
def item_impl(self): def item_impl(self):
""" Get an object out of the pool, for use with with statement. """ Get an object out of the pool, for use with with statement.
>>> from eventlet import pools >>> from eventlet import pools
>>> pool = pools.TokenPool(max_size=4) >>> pool = pools.TokenPool(max_size=4)
@@ -35,20 +35,20 @@ except ImportError:
class Pool(object): class Pool(object):
""" """
Pool is a base class that implements resource limitation and construction. Pool is a base class that implements resource limitation and construction.
It is meant to be subclassed. When subclassing, define only It is meant to be subclassed. When subclassing, define only
the :meth:`create` method to implement the desired resource:: the :meth:`create` method to implement the desired resource::
class MyPool(pools.Pool): class MyPool(pools.Pool):
def create(self): def create(self):
return MyObject() return MyObject()
If using 2.5 or greater, the :meth:`item` method acts as a context manager; If using 2.5 or greater, the :meth:`item` method acts as a context manager;
that's the best way to use it:: that's the best way to use it::
with mypool.item() as thing: with mypool.item() as thing:
thing.dostuff() thing.dostuff()
If stuck on 2.4, the :meth:`get` and :meth:`put` methods are the preferred If stuck on 2.4, the :meth:`get` and :meth:`put` methods are the preferred
nomenclature. Use a ``finally`` to ensure that nothing is leaked:: nomenclature. Use a ``finally`` to ensure that nothing is leaked::
thing = self.pool.get() thing = self.pool.get()
@@ -59,12 +59,12 @@ class Pool(object):
The maximum size of the pool can be modified at runtime via The maximum size of the pool can be modified at runtime via
the :meth:`resize` method. the :meth:`resize` method.
Specifying a non-zero *min-size* argument pre-populates the pool with Specifying a non-zero *min-size* argument pre-populates the pool with
*min_size* items. *max-size* sets a hard limit to the size of the pool -- *min_size* items. *max-size* sets a hard limit to the size of the pool --
it cannot contain any more items than *max_size*, and if there are already it cannot contain any more items than *max_size*, and if there are already
*max_size* items 'checked out' of the pool, the pool will cause any *max_size* items 'checked out' of the pool, the pool will cause any
greenthread calling :meth:`get` to cooperatively yield until an item greenthread calling :meth:`get` to cooperatively yield until an item
is :meth:`put` in. is :meth:`put` in.
""" """
def __init__(self, min_size=0, max_size=4, order_as_stack=False): def __init__(self, min_size=0, max_size=4, order_as_stack=False):
@@ -96,7 +96,7 @@ class Pool(object):
self.current_size += 1 self.current_size += 1
return created return created
return self.channel.get() return self.channel.get()
if item_impl is not None: if item_impl is not None:
item = item_impl item = item_impl
@@ -118,11 +118,11 @@ class Pool(object):
def resize(self, new_size): def resize(self, new_size):
"""Resize the pool to *new_size*. """Resize the pool to *new_size*.
Adjusting this number does not affect existing items checked out of Adjusting this number does not affect existing items checked out of
the pool, nor on any greenthreads who are waiting for an item to free the pool, nor on any greenthreads who are waiting for an item to free
up. Some indeterminate number of :meth:`get`/:meth:`put` up. Some indeterminate number of :meth:`get`/:meth:`put`
cycles will be necessary before the new maximum size truly matches cycles will be necessary before the new maximum size truly matches
the actual operation of the pool. the actual operation of the pool.
""" """
self.max_size = new_size self.max_size = new_size
@@ -137,18 +137,18 @@ class Pool(object):
"""Return the number of routines waiting for a pool item. """Return the number of routines waiting for a pool item.
""" """
return max(0, self.channel.getting() - self.channel.putting()) return max(0, self.channel.getting() - self.channel.putting())
def create(self): def create(self):
"""Generate a new pool item. This method must be overridden in order """Generate a new pool item. This method must be overridden in order
for the pool to function. It accepts no arguments and returns a single for the pool to function. It accepts no arguments and returns a single
instance of whatever thing the pool is supposed to contain. instance of whatever thing the pool is supposed to contain.
In general, :meth:`create` is called whenever the pool exceeds its In general, :meth:`create` is called whenever the pool exceeds its
previous high-water mark of concurrently-checked-out-items. In other previous high-water mark of concurrently-checked-out-items. In other
words, in a new pool with *min_size* of 0, the very first call words, in a new pool with *min_size* of 0, the very first call
to :meth:`get` will result in a call to :meth:`create`. If the first to :meth:`get` will result in a call to :meth:`create`. If the first
caller calls :meth:`put` before some other caller calls :meth:`get`, caller calls :meth:`put` before some other caller calls :meth:`get`,
then the first item will be returned, and :meth:`create` will not be then the first item will be returned, and :meth:`create` will not be
called a second time. called a second time.
""" """
raise NotImplementedError("Implement in subclass") raise NotImplementedError("Implement in subclass")
@@ -165,5 +165,3 @@ class TokenPool(Pool):
""" """
def create(self): def create(self):
return Token() return Token()

View File

@@ -1,5 +1,5 @@
import warnings import warnings
warnings.warn("eventlet.processes is deprecated in favor of " warnings.warn("eventlet.processes is deprecated in favor of "
"eventlet.green.subprocess, which is API-compatible with the standard " "eventlet.green.subprocess, which is API-compatible with the standard "
" library subprocess module.", " library subprocess module.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
@@ -11,7 +11,6 @@ import popen2
import signal import signal
from eventlet import api from eventlet import api
from eventlet import coros
from eventlet import pools from eventlet import pools
from eventlet import greenio from eventlet import greenio
@@ -69,7 +68,9 @@ class Process(object):
greenio.set_nonblocking(child_stdout_stderr) greenio.set_nonblocking(child_stdout_stderr)
greenio.set_nonblocking(child_stdin) greenio.set_nonblocking(child_stdin)
self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr) self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr)
self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes self.child_stdout_stderr.newlines = '\n' # the default is
# \r\n, which aren't sent over
# pipes
self.child_stdin = greenio.GreenPipe(child_stdin) self.child_stdin = greenio.GreenPipe(child_stdin)
self.child_stdin.newlines = '\n' self.child_stdin.newlines = '\n'

View File

@@ -36,7 +36,10 @@ def wrap(obj, dead_callback = None):
return wrap_module(obj.__name__, dead_callback) return wrap_module(obj.__name__, dead_callback)
pythonpath_sync() pythonpath_sync()
if _g_debug_mode: if _g_debug_mode:
p = Process(sys.executable, ["-W", "ignore", __file__, '--child', '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback) p = Process(sys.executable,
["-W", "ignore", __file__, '--child',
'--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')],
dead_callback)
else: else:
p = Process(sys.executable, ["-W", "ignore", __file__, '--child'], dead_callback) p = Process(sys.executable, ["-W", "ignore", __file__, '--child'], dead_callback)
prox = Proxy(ChildProcess(p, p)) prox = Proxy(ChildProcess(p, p))
@@ -53,9 +56,13 @@ def wrap_module(fqname, dead_callback = None):
pythonpath_sync() pythonpath_sync()
global _g_debug_mode global _g_debug_mode
if _g_debug_mode: if _g_debug_mode:
p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname, '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback) p = Process(sys.executable,
["-W", "ignore", __file__, '--module', fqname,
'--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')],
dead_callback)
else: else:
p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname,], dead_callback) p = Process(sys.executable,
["-W", "ignore", __file__, '--module', fqname,], dead_callback)
prox = Proxy(ChildProcess(p,p)) prox = Proxy(ChildProcess(p,p))
return prox return prox
@@ -140,7 +147,8 @@ def _write_request(param, output):
def _is_local(attribute): def _is_local(attribute):
"Return ``True`` if the attribute should be handled locally" "Return ``True`` if the attribute should be handled locally"
# return attribute in ('_in', '_out', '_id', '__getattribute__', '__setattr__', '__dict__') # return attribute in ('_in', '_out', '_id', '__getattribute__',
# '__setattr__', '__dict__')
# good enough for now. :) # good enough for now. :)
if '__local_dict' in attribute: if '__local_dict' in attribute:
return True return True
@@ -266,7 +274,8 @@ class Proxy(object):
my_cp = self.__local_dict['_cp'] my_cp = self.__local_dict['_cp']
my_id = self.__local_dict['_id'] my_id = self.__local_dict['_id']
# Pass the set attribute across # Pass the set attribute across
request = Request('setattr', {'id':my_id, 'attribute':attribute, 'value':value}) request = Request('setattr',
{'id':my_id, 'attribute':attribute, 'value':value})
return my_cp.make_request(request, attribute=attribute) return my_cp.make_request(request, attribute=attribute)
class ObjectProxy(Proxy): class ObjectProxy(Proxy):
@@ -324,7 +333,8 @@ class ObjectProxy(Proxy):
return self.__str__() return self.__str__()
def __nonzero__(self): def __nonzero__(self):
# bool(obj) is another method that skips __getattribute__. There's no good way to just pass # bool(obj) is another method that skips __getattribute__.
# There's no good way to just pass
# the method on, so we use a special message. # the method on, so we use a special message.
my_cp = self.__local_dict['_cp'] my_cp = self.__local_dict['_cp']
my_id = self.__local_dict['_id'] my_id = self.__local_dict['_id']
@@ -395,7 +405,9 @@ class CallableProxy(object):
# having already checked if the method starts with '_' so we # having already checked if the method starts with '_' so we
# can safely pass this one to the remote object. # can safely pass this one to the remote object.
#_prnt("calling %s %s" % (self._object_id, self._name) #_prnt("calling %s %s" % (self._object_id, self._name)
request = Request('call', {'id':self._object_id, 'name':self._name, 'args':args, 'kwargs':kwargs}) request = Request('call', {'id':self._object_id,
'name':self._name,
'args':args, 'kwargs':kwargs})
return self._cp.make_request(request, attribute=self._name) return self._cp.make_request(request, attribute=self._name)
class Server(object): class Server(object):
@@ -444,14 +456,15 @@ class Server(object):
def handle_setitem(self, obj, req): def handle_setitem(self, obj, req):
obj[req['key']] = req['value'] obj[req['key']] = req['value']
return None # *TODO figure out what the actual return value of __setitem__ should be return None # *TODO figure out what the actual return value
# of __setitem__ should be
def handle_eq(self, obj, req): def handle_eq(self, obj, req):
#_log("__eq__ %s %s" % (obj, req)) #_log("__eq__ %s %s" % (obj, req))
rhs = None rhs = None
try: try:
rhs = self._objects[req['rhs']] rhs = self._objects[req['rhs']]
except KeyError, e: except KeyError:
return False return False
return (obj == rhs) return (obj == rhs)
@@ -565,7 +578,7 @@ class Server(object):
#_log("objects: %s" % self._objects) #_log("objects: %s" % self._objects)
s = Pickle.dumps(body) s = Pickle.dumps(body)
_log(`s`) _log(`s`)
str_ = _write_lp_hunk(self._out, s) _write_lp_hunk(self._out, s)
def write_exception(self, e): def write_exception(self, e):
"""Helper method to respond with an exception.""" """Helper method to respond with an exception."""
@@ -621,14 +634,16 @@ def named(name):
import_err_strings.append(err.__str__()) import_err_strings.append(err.__str__())
toimport = '.'.join(toimport.split('.')[:-1]) toimport = '.'.join(toimport.split('.')[:-1])
if obj is None: if obj is None:
raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings)) raise ImportError(
'%s could not be imported. Import errors: %r' % (name, import_err_strings))
for seg in name.split('.')[1:]: for seg in name.split('.')[1:]:
try: try:
obj = getattr(obj, seg) obj = getattr(obj, seg)
except AttributeError: except AttributeError:
dirobj = dir(obj) dirobj = dir(obj)
dirobj.sort() dirobj.sort()
raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % ( raise AttributeError(
'attribute %r missing from %r (%r) %r. Import errors: %r' % (
seg, obj, dirobj, name, import_err_strings)) seg, obj, dirobj, name, import_err_strings))
return obj return obj

View File

@@ -17,5 +17,6 @@ except ImportError, e:
except ImportError: except ImportError:
try: try:
from support.stacklesss import greenlet, getcurrent, GreenletExit from support.stacklesss import greenlet, getcurrent, GreenletExit
(greenlet, getcurrent, GreenletExit) # silence pyflakes
except ImportError, e: except ImportError, e:
raise ImportError("Unable to find an implementation of greenlet.") raise ImportError("Unable to find an implementation of greenlet.")

View File

@@ -30,7 +30,7 @@ class FirstSwitch(object):
gr.t = t gr.t = t
tasklet_to_greenlet[t] = gr tasklet_to_greenlet[t] = gr
t.setup(*args, **kw) t.setup(*args, **kw)
result = t.run() t.run()
class greenlet(object): class greenlet(object):
@@ -75,10 +75,10 @@ def emulate():
module.getcurrent = getcurrent module.getcurrent = getcurrent
module.GreenletExit = GreenletExit module.GreenletExit = GreenletExit
caller = t = stackless.getcurrent() caller = stackless.getcurrent()
tasklet_to_greenlet[t] = None tasklet_to_greenlet[caller] = None
main_coro = greenlet() main_coro = greenlet()
tasklet_to_greenlet[t] = main_coro tasklet_to_greenlet[caller] = main_coro
main_coro.t = t main_coro.t = caller
del main_coro.switch ## It's already running del main_coro.switch ## It's already running
coro_args[main_coro] = None coro_args[main_coro] = None

View File

@@ -33,7 +33,7 @@ _rfile = _wfile = None
def _signal_t2e(): def _signal_t2e():
_wfile.write(' ') _wfile.write(' ')
_wfile.flush() _wfile.flush()
_reqq = None _reqq = None
_rspq = None _rspq = None
@@ -74,9 +74,13 @@ def tworker():
rv = meth(*args,**kwargs) rv = meth(*args,**kwargs)
except SYS_EXCS: except SYS_EXCS:
raise raise
except Exception,exn: except Exception:
rv = sys.exc_info() rv = sys.exc_info()
_rspq.put((e,rv)) _rspq.put((e,rv)) # @@tavis: not supposed to
# keep references to
# sys.exc_info() so it would
# be worthwhile testing
# if this leads to memory leaks
meth = args = kwargs = e = rv = None meth = args = kwargs = e = rv = None
_signal_t2e() _signal_t2e()
@@ -118,10 +122,10 @@ def proxy_call(autowrap, f, *args, **kwargs):
""" """
Call a function *f* and returns the value. If the type of the return value Call a function *f* and returns the value. If the type of the return value
is in the *autowrap* collection, then it is wrapped in a :class:`Proxy` is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
object before return. object before return.
Normally *f* will be called in the threadpool with :func:`execute`; if the Normally *f* will be called in the threadpool with :func:`execute`; if the
keyword argument "nonblocking" is set to ``True``, it will simply be keyword argument "nonblocking" is set to ``True``, it will simply be
executed directly. This is useful if you have an object which has methods executed directly. This is useful if you have an object which has methods
that don't need to be called in a separate thread, but which return objects that don't need to be called in a separate thread, but which return objects
that should be Proxy wrapped. that should be Proxy wrapped.
@@ -242,7 +246,7 @@ def setup():
_threads.add(t) _threads.add(t)
_coro = greenthread.spawn_n(tpool_trampoline) _coro = greenthread.spawn_n(tpool_trampoline)
def killall(): def killall():
global _setup_already, _reqq, _rspq, _rfile, _wfile global _setup_already, _reqq, _rspq, _rfile, _wfile

View File

@@ -4,10 +4,10 @@ You generally don't have to use it unless you need to call reactor.run()
yourself. yourself.
""" """
from eventlet.hubs.twistedr import BaseTwistedHub from eventlet.hubs.twistedr import BaseTwistedHub
from eventlet.api import use_hub, _threadlocal from eventlet import use_hub
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet.hubs import _threadlocal
use_hub(BaseTwistedHub) use_hub(BaseTwistedHub)
assert not hasattr(_threadlocal, 'hub') assert not hasattr(_threadlocal, 'hub')
hub = _threadlocal.hub = _threadlocal.Hub(greenlet.getcurrent()) hub = _threadlocal.hub = _threadlocal.Hub(greenlet.getcurrent())

View File

@@ -1,12 +1,11 @@
import os
import socket import socket
import errno
import warnings import warnings
from eventlet import greenio
def g_log(*args): def g_log(*args):
warnings.warn("eventlet.util.g_log is deprecated because we're pretty sure no one uses it. Send mail to eventletdev@lists.secondlife.com if you are actually using it.", warnings.warn("eventlet.util.g_log is deprecated because "
"we're pretty sure no one uses it. "
"Send mail to eventletdev@lists.secondlife.com "
"if you are actually using it.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
import sys import sys
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
@@ -42,21 +41,22 @@ try:
server_side=server_side, cert_reqs=ssl.CERT_NONE, server_side=server_side, cert_reqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=None, ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=None,
do_handshake_on_connect=True, do_handshake_on_connect=True,
suppress_ragged_eofs=True) suppress_ragged_eofs=True)
except ImportError: except ImportError:
# if ssl is not available, use PyOpenSSL # if ssl is not available, use PyOpenSSL
def wrap_ssl(sock, certificate=None, private_key=None, server_side=False): def wrap_ssl(sock, certificate=None, private_key=None, server_side=False):
try: try:
from eventlet.green.OpenSSL import SSL from eventlet.green.OpenSSL import SSL
except ImportError: except ImportError:
raise ImportError("To use SSL with Eventlet, you must install PyOpenSSL or use Python 2.6 or later.") raise ImportError("To use SSL with Eventlet, "
"you must install PyOpenSSL or use Python 2.6 or later.")
context = SSL.Context(SSL.SSLv23_METHOD) context = SSL.Context(SSL.SSLv23_METHOD)
if certificate is not None: if certificate is not None:
context.use_certificate_file(certificate) context.use_certificate_file(certificate)
if private_key is not None: if private_key is not None:
context.use_privatekey_file(private_key) context.use_privatekey_file(private_key)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True) context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
connection = SSL.Connection(context, sock) connection = SSL.Connection(context, sock)
if server_side: if server_side:
connection.set_accept_state() connection.set_accept_state()
@@ -65,22 +65,22 @@ except ImportError:
return connection return connection
def wrap_socket_with_coroutine_socket(use_thread_pool=None): def wrap_socket_with_coroutine_socket(use_thread_pool=None):
warnings.warn("eventlet.util.wrap_socket_with_coroutine_socket() is now " warnings.warn("eventlet.util.wrap_socket_with_coroutine_socket() is now "
"eventlet.patcher.monkey_patch(all=False, socket=True)", "eventlet.patcher.monkey_patch(all=False, socket=True)",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
from eventlet import patcher from eventlet import patcher
patcher.monkey_patch(all=False, socket=True) patcher.monkey_patch(all=False, socket=True)
def wrap_pipes_with_coroutine_pipes(): def wrap_pipes_with_coroutine_pipes():
warnings.warn("eventlet.util.wrap_pipes_with_coroutine_pipes() is now " warnings.warn("eventlet.util.wrap_pipes_with_coroutine_pipes() is now "
"eventlet.patcher.monkey_patch(all=False, os=True)", "eventlet.patcher.monkey_patch(all=False, os=True)",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
from eventlet import patcher from eventlet import patcher
patcher.monkey_patch(all=False, os=True) patcher.monkey_patch(all=False, os=True)
def wrap_select_with_coroutine_select(): def wrap_select_with_coroutine_select():
warnings.warn("eventlet.util.wrap_select_with_coroutine_select() is now " warnings.warn("eventlet.util.wrap_select_with_coroutine_select() is now "
"eventlet.patcher.monkey_patch(all=False, select=True)", "eventlet.patcher.monkey_patch(all=False, select=True)",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
from eventlet import patcher from eventlet import patcher
@@ -92,7 +92,7 @@ def wrap_threading_local_with_coro_local():
Since greenlets cannot cross threads, so this should be semantically Since greenlets cannot cross threads, so this should be semantically
identical to ``threadlocal.local`` identical to ``threadlocal.local``
""" """
warnings.warn("eventlet.util.wrap_threading_local_with_coro_local() is now " warnings.warn("eventlet.util.wrap_threading_local_with_coro_local() is now "
"eventlet.patcher.monkey_patch(all=False, thread=True) -- though" "eventlet.patcher.monkey_patch(all=False, thread=True) -- though"
"note that more than just _local is patched now.", "note that more than just _local is patched now.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
@@ -126,4 +126,3 @@ def set_reuse_addr(descriptor):
descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1) descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1)
except socket.error: except socket.error:
pass pass

View File

@@ -15,7 +15,8 @@ DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024
DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1' DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1'
MAX_REQUEST_LINE = 8192 MAX_REQUEST_LINE = 8192
MINIMUM_CHUNK_SIZE = 4096 MINIMUM_CHUNK_SIZE = 4096
DEFAULT_LOG_FORMAT='%(client_ip)s - - [%(date_time)s] "%(request_line)s" %(status_code)s %(body_length)s %(wall_seconds).6f' DEFAULT_LOG_FORMAT= ('%(client_ip)s - - [%(date_time)s] "%(request_line)s"'
' %(status_code)s %(body_length)s %(wall_seconds).6f')
__all__ = ['server', 'format_date_time'] __all__ = ['server', 'format_date_time']
@@ -32,7 +33,7 @@ def format_date_time(timestamp):
_weekdayname[wd], day, _monthname[month], year, hh, mm, ss _weekdayname[wd], day, _monthname[month], year, hh, mm, ss
) )
# Collections of error codes to compare against. Not all attributes are set # Collections of error codes to compare against. Not all attributes are set
# on errno module on all platforms, so some are literals :( # on errno module on all platforms, so some are literals :(
BAD_SOCK = set((errno.EBADF, 10053)) BAD_SOCK = set((errno.EBADF, 10053))
BROKEN_SOCK = set((errno.EPIPE, errno.ECONNRESET)) BROKEN_SOCK = set((errno.EPIPE, errno.ECONNRESET))
@@ -41,8 +42,8 @@ BROKEN_SOCK = set((errno.EPIPE, errno.ECONNRESET))
ALREADY_HANDLED = object() ALREADY_HANDLED = object()
def get_errno(err): def get_errno(err):
""" Simple method to get the error code out of socket.error objects. It """ Simple method to get the error code out of socket.error objects. It
compensates for some cases where the code is not in the expected compensates for some cases where the code is not in the expected
location.""" location."""
try: try:
return err[0] return err[0]
@@ -50,13 +51,13 @@ def get_errno(err):
return None return None
class Input(object): class Input(object):
def __init__(self, def __init__(self,
rfile, rfile,
content_length, content_length,
wfile=None, wfile=None,
wfile_line=None, wfile_line=None,
chunked_input=False): chunked_input=False):
self.rfile = rfile self.rfile = rfile
if content_length is not None: if content_length is not None:
content_length = int(content_length) content_length = int(content_length)
@@ -136,7 +137,7 @@ class Input(object):
def __iter__(self): def __iter__(self):
return iter(self.read()) return iter(self.read())
def get_socket(self): def get_socket(self):
return self.rfile._sock.dup() return self.rfile._sock.dup()
@@ -144,7 +145,7 @@ class Input(object):
class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1' protocol_version = 'HTTP/1.1'
minimum_chunk_size = MINIMUM_CHUNK_SIZE minimum_chunk_size = MINIMUM_CHUNK_SIZE
def setup(self): def setup(self):
# overriding SocketServer.setup to correctly handle SSL.Connection objects # overriding SocketServer.setup to correctly handle SSL.Connection objects
conn = self.connection = self.request conn = self.connection = self.request
@@ -173,7 +174,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE) self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE)
if len(self.raw_requestline) == MAX_REQUEST_LINE: if len(self.raw_requestline) == MAX_REQUEST_LINE:
self.wfile.write( self.wfile.write(
"HTTP/1.0 414 Request URI Too Long\r\nConnection: close\r\nContent-length: 0\r\n\r\n") "HTTP/1.0 414 Request URI Too Long\r\n"
"Connection: close\r\nContent-length: 0\r\n\r\n")
self.close_connection = 1 self.close_connection = 1
return return
except greenio.SSL.ZeroReturnError: except greenio.SSL.ZeroReturnError:
@@ -264,7 +266,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
if self.close_connection: if self.close_connection:
towrite.append('Connection: close\r\n') towrite.append('Connection: close\r\n')
elif send_keep_alive: elif send_keep_alive:
towrite.append('Connection: keep-alive\r\n') towrite.append('Connection: keep-alive\r\n')
towrite.append('\r\n') towrite.append('\r\n')
# end of header writing # end of header writing
@@ -277,7 +279,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
_writelines(towrite) _writelines(towrite)
length[0] = length[0] + sum(map(len, towrite)) length[0] = length[0] + sum(map(len, towrite))
except UnicodeEncodeError: except UnicodeEncodeError:
print "Encountered unicode while attempting to write wsgi response: ", [x for x in towrite if isinstance(x, unicode)] print "Encountered unicode while attempting to write wsgi response: ", \
[x for x in towrite if isinstance(x, unicode)]
traceback.print_exc() traceback.print_exc()
_writelines( _writelines(
["HTTP/1.0 500 Internal Server Error\r\n", ["HTTP/1.0 500 Internal Server Error\r\n",
@@ -285,7 +288,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
"Content-type: text/plain\r\n", "Content-type: text/plain\r\n",
"Content-length: 98\r\n", "Content-length: 98\r\n",
"\r\n", "\r\n",
"Internal Server Error: wsgi application passed a unicode object to the server instead of a string."]) ("Internal Server Error: wsgi application passed "
"a unicode object to the server instead of a string.")])
def start_response(status, response_headers, exc_info=None): def start_response(status, response_headers, exc_info=None):
status_code[0] = status.split()[0] status_code[0] = status.split()[0]
@@ -298,7 +302,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
# Avoid dangling circular ref # Avoid dangling circular ref
exc_info = None exc_info = None
capitalized_headers = [('-'.join([x.capitalize() for x in key.split('-')]), value) capitalized_headers = [('-'.join([x.capitalize()
for x in key.split('-')]), value)
for key, value in response_headers] for key, value in response_headers]
headers_set[:] = [status, capitalized_headers] headers_set[:] = [status, capitalized_headers]
@@ -329,17 +334,19 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
write(''.join(towrite)) write(''.join(towrite))
if not headers_sent or (use_chunked[0] and just_written_size): if not headers_sent or (use_chunked[0] and just_written_size):
write('') write('')
except Exception, e: except Exception:
self.close_connection = 1 self.close_connection = 1
exc = traceback.format_exc() exc = traceback.format_exc()
print exc print exc
if not headers_set: if not headers_set:
start_response("500 Internal Server Error", [('Content-type', 'text/plain')]) start_response("500 Internal Server Error",
[('Content-type', 'text/plain')])
write(exc) write(exc)
finally: finally:
if hasattr(result, 'close'): if hasattr(result, 'close'):
result.close() result.close()
if self.environ['eventlet.input'].position < self.environ.get('CONTENT_LENGTH', 0): if (self.environ['eventlet.input'].position
< self.environ.get('CONTENT_LENGTH', 0)):
## Read and discard body if there was no pending 100-continue ## Read and discard body if there was no pending 100-continue
if not self.environ['eventlet.input'].wfile: if not self.environ['eventlet.input'].wfile:
while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE): while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE):
@@ -353,7 +360,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
status_code=status_code[0], status_code=status_code[0],
body_length=length[0], body_length=length[0],
wall_seconds=finish - start)) wall_seconds=finish - start))
def get_client_ip(self): def get_client_ip(self):
client_ip = self.client_address[0] client_ip = self.client_address[0]
if self.server.log_x_forwarded_for: if self.server.log_x_forwarded_for:
@@ -422,19 +429,19 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
class Server(BaseHTTPServer.HTTPServer): class Server(BaseHTTPServer.HTTPServer):
def __init__(self, def __init__(self,
socket, socket,
address, address,
app, app,
log=None, log=None,
environ=None, environ=None,
max_http_version=None, max_http_version=None,
protocol=HttpProtocol, protocol=HttpProtocol,
minimum_chunk_size=None, minimum_chunk_size=None,
log_x_forwarded_for=True, log_x_forwarded_for=True,
keepalive=True, keepalive=True,
log_format=DEFAULT_LOG_FORMAT): log_format=DEFAULT_LOG_FORMAT):
self.outstanding_requests = 0 self.outstanding_requests = 0
self.socket = socket self.socket = socket
self.address = address self.address = address
@@ -454,7 +461,6 @@ class Server(BaseHTTPServer.HTTPServer):
self.log_format = log_format self.log_format = log_format
def get_environ(self): def get_environ(self):
socket = self.socket
d = { d = {
'wsgi.errors': sys.stderr, 'wsgi.errors': sys.stderr,
'wsgi.version': (1, 0), 'wsgi.version': (1, 0),
@@ -477,29 +483,29 @@ class Server(BaseHTTPServer.HTTPServer):
try: try:
import ssl import ssl
ACCEPT_EXCEPTIONS = (socket.error, ssl.SSLError) ACCEPT_EXCEPTIONS = (socket.error, ssl.SSLError)
ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF, ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF,
ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_SSL)) ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_SSL))
except ImportError: except ImportError:
ACCEPT_EXCEPTIONS = (socket.error,) ACCEPT_EXCEPTIONS = (socket.error,)
ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF)) ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF))
def server(sock, site, def server(sock, site,
log=None, log=None,
environ=None, environ=None,
max_size=None, max_size=None,
max_http_version=DEFAULT_MAX_HTTP_VERSION, max_http_version=DEFAULT_MAX_HTTP_VERSION,
protocol=HttpProtocol, protocol=HttpProtocol,
server_event=None, server_event=None,
minimum_chunk_size=None, minimum_chunk_size=None,
log_x_forwarded_for=True, log_x_forwarded_for=True,
custom_pool=None, custom_pool=None,
keepalive=True, keepalive=True,
log_format=DEFAULT_LOG_FORMAT): log_format=DEFAULT_LOG_FORMAT):
""" Start up a wsgi server handling requests from the supplied server """ Start up a wsgi server handling requests from the supplied server
socket. This function loops forever. The *sock* object will be closed after server exits, socket. This function loops forever. The *sock* object will be closed after server exits,
but the underlying file descriptor will remain open, so if you have a dup() of *sock*, but the underlying file descriptor will remain open, so if you have a dup() of *sock*,
it will remain usable. it will remain usable.
:param sock: Server socket, must be already bound to a port and listening. :param sock: Server socket, must be already bound to a port and listening.
:param site: WSGI application function. :param site: WSGI application function.
:param log: File-like object that logs should be written to. If not specified, sys.stderr is used. :param log: File-like object that logs should be written to. If not specified, sys.stderr is used.
@@ -514,11 +520,11 @@ def server(sock, site,
:param keepalive: If set to False, disables keepalives on the server; all connections will be closed after serving one request. :param keepalive: If set to False, disables keepalives on the server; all connections will be closed after serving one request.
:param log_format: A python format string that is used as the template to generate log lines. The following values can be formatted into it: client_ip, date_time, request_line, status_code, body_length, wall_seconds. Look the default for an example of how to use this. :param log_format: A python format string that is used as the template to generate log lines. The following values can be formatted into it: client_ip, date_time, request_line, status_code, body_length, wall_seconds. Look the default for an example of how to use this.
""" """
serv = Server(sock, sock.getsockname(), serv = Server(sock, sock.getsockname(),
site, log, site, log,
environ=None, environ=None,
max_http_version=max_http_version, max_http_version=max_http_version,
protocol=protocol, protocol=protocol,
minimum_chunk_size=minimum_chunk_size, minimum_chunk_size=minimum_chunk_size,
log_x_forwarded_for=log_x_forwarded_for, log_x_forwarded_for=log_x_forwarded_for,
keepalive=keepalive, keepalive=keepalive,
@@ -543,12 +549,13 @@ def server(sock, site,
if port == ':80': if port == ':80':
port = '' port = ''
serv.log.write("(%s) wsgi starting up on %s://%s%s/\n" % (os.getpid(), scheme, host, port)) serv.log.write("(%s) wsgi starting up on %s://%s%s/\n" % (
os.getpid(), scheme, host, port))
while True: while True:
try: try:
client_socket = sock.accept() client_socket = sock.accept()
try: try:
pool.spawn_n(serv.process_request, client_socket) pool.spawn_n(serv.process_request, client_socket)
except AttributeError: except AttributeError:
warnings.warn("wsgi's pool should be an instance of " \ warnings.warn("wsgi's pool should be an instance of " \
"eventlet.greenpool.GreenPool, is %s. Please convert your"\ "eventlet.greenpool.GreenPool, is %s. Please convert your"\
@@ -572,4 +579,3 @@ def server(sock, site,
except socket.error, e: except socket.error, e:
if get_errno(e) not in BROKEN_SOCK: if get_errno(e) not in BROKEN_SOCK:
traceback.print_exc() traceback.print_exc()

View File

@@ -1,11 +1,14 @@
"Test cases for db_pool" "Test cases for db_pool"
import sys
import os
import traceback
from unittest import TestCase, main
from tests import skipped, skip_unless, skip_with_pyevent from tests import skipped, skip_unless, skip_with_pyevent
from unittest import TestCase, main
from eventlet import event from eventlet import event
from eventlet import db_pool from eventlet import db_pool
import eventlet import eventlet
import os
class DBTester(object): class DBTester(object):
__test__ = False # so that nose doesn't try to execute this directly __test__ = False # so that nose doesn't try to execute this directly
@@ -14,19 +17,19 @@ class DBTester(object):
self.connection = None self.connection = None
connection = self._dbmodule.connect(**self._auth) connection = self._dbmodule.connect(**self._auth)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("""CREATE TABLE gargleblatz cursor.execute("""CREATE TABLE gargleblatz
( (
a INTEGER a INTEGER
);""") );""")
connection.commit() connection.commit()
cursor.close() cursor.close()
def tearDown(self): def tearDown(self):
if self.connection: if self.connection:
self.connection.close() self.connection.close()
self.drop_db() self.drop_db()
def set_up_dummy_table(self, connection = None): def set_up_dummy_table(self, connection=None):
close_connection = False close_connection = False
if connection is None: if connection is None:
close_connection = True close_connection = True
@@ -53,7 +56,7 @@ class DBConnectionPool(DBTester):
super(DBConnectionPool, self).setUp() super(DBConnectionPool, self).setUp()
self.pool = self.create_pool() self.pool = self.create_pool()
self.connection = self.pool.get() self.connection = self.pool.get()
def tearDown(self): def tearDown(self):
if self.connection: if self.connection:
self.pool.put(self.connection) self.pool.put(self.connection)
@@ -84,7 +87,7 @@ class DBConnectionPool(DBTester):
self.assert_(False) self.assert_(False)
except AssertionError: except AssertionError:
raise raise
except Exception, e: except Exception:
pass pass
cursor.close() cursor.close()
@@ -107,7 +110,7 @@ class DBConnectionPool(DBTester):
@skipped @skipped
def test_deletion_does_a_put(self): def test_deletion_does_a_put(self):
# doing a put on del causes some issues if __del__ is called in the # doing a put on del causes some issues if __del__ is called in the
# main coroutine, so, not doing that for now # main coroutine, so, not doing that for now
self.assert_(self.pool.free() == 0) self.assert_(self.pool.free() == 0)
self.connection = None self.connection = None
@@ -144,7 +147,6 @@ class DBConnectionPool(DBTester):
curs.execute(SHORT_QUERY) curs.execute(SHORT_QUERY)
results.append(2) results.append(2)
evt.send() evt.send()
evt2 = event.Event()
eventlet.spawn(a_query) eventlet.spawn(a_query)
results.append(1) results.append(1)
self.assertEqual([1], results) self.assertEqual([1], results)
@@ -201,10 +203,10 @@ class DBConnectionPool(DBTester):
curs.execute("delete from gargleblatz where a=314159") curs.execute("delete from gargleblatz where a=314159")
conn.commit() conn.commit()
self.pool.put(conn) self.pool.put(conn)
@skipped @skipped
def test_two_simultaneous_connections(self): def test_two_simultaneous_connections(self):
# timing-sensitive test, disabled until we come up with a better # timing-sensitive test, disabled until we come up with a better
# way to do this # way to do this
self.pool = self.create_pool(2) self.pool = self.create_pool(2)
conn = self.pool.get() conn = self.pool.get()
@@ -238,36 +240,36 @@ class DBConnectionPool(DBTester):
evt2.wait() evt2.wait()
results.sort() results.sort()
self.assertEqual([1, 2], results) self.assertEqual([1, 2], results)
def test_clear(self): def test_clear(self):
self.pool = self.create_pool() self.pool = self.create_pool()
self.pool.put(self.connection) self.pool.put(self.connection)
self.pool.clear() self.pool.clear()
self.assertEqual(len(self.pool.free_items), 0) self.assertEqual(len(self.pool.free_items), 0)
def test_unwrap_connection(self): def test_unwrap_connection(self):
self.assert_(isinstance(self.connection, self.assert_(isinstance(self.connection,
db_pool.GenericConnectionWrapper)) db_pool.GenericConnectionWrapper))
conn = self.pool._unwrap_connection(self.connection) conn = self.pool._unwrap_connection(self.connection)
self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper)) self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper))
self.assertEquals(None, self.pool._unwrap_connection(None)) self.assertEquals(None, self.pool._unwrap_connection(None))
self.assertEquals(None, self.pool._unwrap_connection(1)) self.assertEquals(None, self.pool._unwrap_connection(1))
# testing duck typing here -- as long as the connection has a # testing duck typing here -- as long as the connection has a
# _base attribute, it should be unwrappable # _base attribute, it should be unwrappable
x = Mock() x = Mock()
x._base = 'hi' x._base = 'hi'
self.assertEquals('hi', self.pool._unwrap_connection(x)) self.assertEquals('hi', self.pool._unwrap_connection(x))
conn.close() conn.close()
def test_safe_close(self): def test_safe_close(self):
self.pool._safe_close(self.connection, quiet=True) self.pool._safe_close(self.connection, quiet=True)
self.assertEquals(len(self.pool.free_items), 1) self.assertEquals(len(self.pool.free_items), 1)
self.pool._safe_close(None) self.pool._safe_close(None)
self.pool._safe_close(1) self.pool._safe_close(1)
# now we're really going for 100% coverage # now we're really going for 100% coverage
x = Mock() x = Mock()
def fail(): def fail():
@@ -280,7 +282,7 @@ class DBConnectionPool(DBTester):
raise RuntimeError("if this line has been printed, the test succeeded") raise RuntimeError("if this line has been printed, the test succeeded")
x.close = fail2 x.close = fail2
self.pool._safe_close(x, quiet=False) self.pool._safe_close(x, quiet=False)
def test_zero_max_idle(self): def test_zero_max_idle(self):
self.pool.put(self.connection) self.pool.put(self.connection)
self.pool.clear() self.pool.clear()
@@ -296,10 +298,13 @@ class DBConnectionPool(DBTester):
self.connection = self.pool.get() self.connection = self.pool.get()
self.connection.close() self.connection.close()
self.assertEquals(len(self.pool.free_items), 0) self.assertEquals(len(self.pool.free_items), 0)
@skipped @skipped
def test_max_idle(self): def test_max_idle(self):
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. # This test is timing-sensitive. Rename the function without
# the "dont" to run it, but beware that it could fail or take
# a while.
self.pool = self.create_pool(max_size=2, max_idle=0.02) self.pool = self.create_pool(max_size=2, max_idle=0.02)
self.connection = self.pool.get() self.connection = self.pool.get()
self.connection.close() self.connection.close()
@@ -319,7 +324,10 @@ class DBConnectionPool(DBTester):
@skipped @skipped
def test_max_idle_many(self): def test_max_idle_many(self):
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. # This test is timing-sensitive. Rename the function without
# the "dont" to run it, but beware that it could fail or take
# a while.
self.pool = self.create_pool(max_size=2, max_idle=0.02) self.pool = self.create_pool(max_size=2, max_idle=0.02)
self.connection, conn2 = self.pool.get(), self.pool.get() self.connection, conn2 = self.pool.get(), self.pool.get()
self.connection.close() self.connection.close()
@@ -332,7 +340,10 @@ class DBConnectionPool(DBTester):
@skipped @skipped
def test_max_age(self): def test_max_age(self):
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. # This test is timing-sensitive. Rename the function without
# the "dont" to run it, but beware that it could fail or take
# a while.
self.pool = self.create_pool(max_size=2, max_age=0.05) self.pool = self.create_pool(max_size=2, max_age=0.05)
self.connection = self.pool.get() self.connection = self.pool.get()
self.connection.close() self.connection.close()
@@ -347,7 +358,10 @@ class DBConnectionPool(DBTester):
@skipped @skipped
def test_max_age_many(self): def test_max_age_many(self):
# This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. # This test is timing-sensitive. Rename the function without
# the "dont" to run it, but beware that it could fail or take
# a while.
self.pool = self.create_pool(max_size=2, max_age=0.15) self.pool = self.create_pool(max_size=2, max_age=0.15)
self.connection, conn2 = self.pool.get(), self.pool.get() self.connection, conn2 = self.pool.get(), self.pool.get()
self.connection.close() self.connection.close()
@@ -366,7 +380,7 @@ class DBConnectionPool(DBTester):
self.pool.put(self.connection) self.pool.put(self.connection)
self.pool.clear() self.pool.clear()
self.pool = self.create_pool(max_size=1, max_age=0) self.pool = self.create_pool(max_size=1, max_age=0)
self.connection = self.pool.get() self.connection = self.pool.get()
self.assertEquals(self.pool.free(), 0) self.assertEquals(self.pool.free(), 0)
self.assertEquals(self.pool.waiting(), 0) self.assertEquals(self.pool.waiting(), 0)
@@ -397,7 +411,7 @@ class DBConnectionPool(DBTester):
def bench(c): def bench(c):
for i in xrange(iterations): for i in xrange(iterations):
c.execute('select 1') c.execute('select 1')
bench(c) # warm-up bench(c) # warm-up
results = [] results = []
for i in xrange(3): for i in xrange(3):
@@ -405,7 +419,7 @@ class DBConnectionPool(DBTester):
bench(c) bench(c)
end = time.time() end = time.time()
results.append(end-start) results.append(end-start)
print "\n%u iterations took an average of %f seconds, (%s) in %s\n" % ( print "\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
iterations, sum(results)/len(results), results, type(self)) iterations, sum(results)/len(results), results, type(self))
@@ -415,29 +429,30 @@ class DBConnectionPool(DBTester):
self.pool = self.create_pool(max_size=1, module=RaisingDBModule()) self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
self.assertRaises(RuntimeError, self.pool.get) self.assertRaises(RuntimeError, self.pool.get)
self.assertEquals(self.pool.free(), 1) self.assertEquals(self.pool.free(), 1)
class RaisingDBModule(object): class RaisingDBModule(object):
def connect(self, *args, **kw): def connect(self, *args, **kw):
raise RuntimeError() raise RuntimeError()
class TpoolConnectionPool(DBConnectionPool): class TpoolConnectionPool(DBConnectionPool):
__test__ = False # so that nose doesn't try to execute this directly __test__ = False # so that nose doesn't try to execute this directly
def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None): def create_pool(self, max_size=1, max_idle=10, max_age=10,
connect_timeout=0.5, module=None):
if module is None: if module is None:
module = self._dbmodule module = self._dbmodule
return db_pool.TpooledConnectionPool(module, return db_pool.TpooledConnectionPool(module,
min_size=0, max_size=max_size, min_size=0, max_size=max_size,
max_idle=max_idle, max_age=max_age, max_idle=max_idle, max_age=max_age,
connect_timeout = connect_timeout, connect_timeout = connect_timeout,
**self._auth) **self._auth)
@skip_with_pyevent @skip_with_pyevent
def setUp(self): def setUp(self):
super(TpoolConnectionPool, self).setUp() super(TpoolConnectionPool, self).setUp()
def tearDown(self): def tearDown(self):
super(TpoolConnectionPool, self).tearDown() super(TpoolConnectionPool, self).tearDown()
from eventlet import tpool from eventlet import tpool
@@ -447,19 +462,21 @@ class TpoolConnectionPool(DBConnectionPool):
class RawConnectionPool(DBConnectionPool): class RawConnectionPool(DBConnectionPool):
__test__ = False # so that nose doesn't try to execute this directly __test__ = False # so that nose doesn't try to execute this directly
def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): def create_pool(self, max_size=1, max_idle=10, max_age=10,
connect_timeout=0.5, module=None):
if module is None: if module is None:
module = self._dbmodule module = self._dbmodule
return db_pool.RawConnectionPool(module, return db_pool.RawConnectionPool(module,
min_size=0, max_size=max_size, min_size=0, max_size=max_size,
max_idle=max_idle, max_age=max_age, max_idle=max_idle, max_age=max_age,
connect_timeout=connect_timeout, connect_timeout=connect_timeout,
**self._auth) **self._auth)
def get_auth(): def get_auth():
"""Looks in the local directory and in the user's home directory for a file named ".test_dbauth", """Looks in the local directory and in the user's home directory
which contains a json map of parameters to the connect function. for a file named ".test_dbauth", which contains a json map of
parameters to the connect function.
""" """
files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'), files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'),
os.path.join(os.path.expanduser('~'), '.test_dbauth')] os.path.join(os.path.expanduser('~'), '.test_dbauth')]
@@ -473,13 +490,14 @@ def get_auth():
return dict([(str(modname), dict([(str(k), str(v)) return dict([(str(modname), dict([(str(k), str(v))
for k, v in connectargs.items()])) for k, v in connectargs.items()]))
for modname, connectargs in auth_utf8.items()]) for modname, connectargs in auth_utf8.items()])
except (IOError, ImportError), e: except (IOError, ImportError):
pass pass
return {'MySQLdb':{'host': 'localhost','user': 'root','passwd': ''}, return {'MySQLdb':{'host': 'localhost','user': 'root','passwd': ''},
'psycopg2':{'user':'test'}} 'psycopg2':{'user':'test'}}
def mysql_requirement(_f): def mysql_requirement(_f):
verbose = os.environ.get('eventlet_test_mysql_verbose')
try: try:
import MySQLdb import MySQLdb
try: try:
@@ -487,26 +505,27 @@ def mysql_requirement(_f):
MySQLdb.connect(**auth) MySQLdb.connect(**auth)
return True return True
except MySQLdb.OperationalError: except MySQLdb.OperationalError:
print "Skipping mysql tests, error when connecting" if verbose:
import traceback print >> sys.stderr, ">> Skipping mysql tests, error when connecting:"
traceback.print_exc() traceback.print_exc()
return False return False
except ImportError: except ImportError:
print "Skipping mysql tests, MySQLdb not importable" if verbose:
print >> sys.stderr, ">> Skipping mysql tests, MySQLdb not importable"
return False return False
class MysqlConnectionPool(object): class MysqlConnectionPool(object):
dummy_table_sql = """CREATE TEMPORARY TABLE test_table dummy_table_sql = """CREATE TEMPORARY TABLE test_table
( (
row_id INTEGER PRIMARY KEY AUTO_INCREMENT, row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
value_int INTEGER, value_int INTEGER,
value_float FLOAT, value_float FLOAT,
value_string VARCHAR(200), value_string VARCHAR(200),
value_uuid CHAR(36), value_uuid CHAR(36),
value_binary BLOB, value_binary BLOB,
value_binary_string VARCHAR(200) BINARY, value_binary_string VARCHAR(200) BINARY,
value_enum ENUM('Y','N'), value_enum ENUM('Y','N'),
created TIMESTAMP created TIMESTAMP
) ENGINE=InnoDB;""" ) ENGINE=InnoDB;"""
@skip_unless(mysql_requirement) @skip_unless(mysql_requirement)
@@ -515,7 +534,7 @@ class MysqlConnectionPool(object):
self._dbmodule = MySQLdb self._dbmodule = MySQLdb
self._auth = get_auth()['MySQLdb'] self._auth = get_auth()['MySQLdb']
super(MysqlConnectionPool, self).setUp() super(MysqlConnectionPool, self).setUp()
def tearDown(self): def tearDown(self):
super(MysqlConnectionPool, self).tearDown() super(MysqlConnectionPool, self).tearDown()
@@ -561,16 +580,16 @@ def postgres_requirement(_f):
class Psycopg2ConnectionPool(object): class Psycopg2ConnectionPool(object):
dummy_table_sql = """CREATE TEMPORARY TABLE test_table dummy_table_sql = """CREATE TEMPORARY TABLE test_table
( (
row_id SERIAL PRIMARY KEY, row_id SERIAL PRIMARY KEY,
value_int INTEGER, value_int INTEGER,
value_float FLOAT, value_float FLOAT,
value_string VARCHAR(200), value_string VARCHAR(200),
value_uuid CHAR(36), value_uuid CHAR(36),
value_binary BYTEA, value_binary BYTEA,
value_binary_string BYTEA, value_binary_string BYTEA,
created TIMESTAMP created TIMESTAMP
);""" );"""
@skip_unless(postgres_requirement) @skip_unless(postgres_requirement)
@@ -600,7 +619,6 @@ class Psycopg2ConnectionPool(object):
def drop_db(self): def drop_db(self):
auth = self._auth.copy() auth = self._auth.copy()
dbname = auth.pop('database')
conn = self._dbmodule.connect(**auth) conn = self._dbmodule.connect(**auth)
conn.set_isolation_level(0) conn.set_isolation_level(0)
db = conn.cursor() db = conn.cursor()

View File

@@ -13,7 +13,7 @@ import array
def bufsized(sock, size=1): def bufsized(sock, size=1):
""" Resize both send and receive buffers on a socket. """ Resize both send and receive buffers on a socket.
Useful for testing trampoline. Returns the socket. Useful for testing trampoline. Returns the socket.
>>> import socket >>> import socket
>>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
""" """
@@ -40,7 +40,7 @@ class TestGreenIo(LimitedTestCase):
self.assertEqual(e.args[0], 'timed out') self.assertEqual(e.args[0], 'timed out')
except socket.error, e: except socket.error, e:
# unreachable is also a valid outcome # unreachable is also a valid outcome
if e[0] != errno.EHOSTUNREACH: if not e[0] in (errno.EHOSTUNREACH, errno.ENETUNREACH):
raise raise
def test_accept_timeout(self): def test_accept_timeout(self):
@@ -62,7 +62,8 @@ class TestGreenIo(LimitedTestCase):
s.settimeout(0.1) s.settimeout(0.1)
gs = greenio.GreenSocket(s) gs = greenio.GreenSocket(s)
e = gs.connect_ex(('192.0.2.1', 80)) e = gs.connect_ex(('192.0.2.1', 80))
self.assertEquals(e, errno.EAGAIN) if not e in (errno.EHOSTUNREACH, errno.ENETUNREACH):
self.assertEquals(e, errno.EAGAIN)
def test_recv_timeout(self): def test_recv_timeout(self):
listener = greenio.GreenSocket(socket.socket()) listener = greenio.GreenSocket(socket.socket())
@@ -249,16 +250,16 @@ class TestGreenIo(LimitedTestCase):
self.assertRaises(socket.error, conn.send, 'b') self.assertRaises(socket.error, conn.send, 'b')
finally: finally:
listener.close() listener.close()
def did_it_work(server): def did_it_work(server):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', server.getsockname()[1])) client.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile() fd = client.makefile()
client.close() client.close()
assert fd.readline() == 'hello\n' assert fd.readline() == 'hello\n'
assert fd.read() == '' assert fd.read() == ''
fd.close() fd.close()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 0)) server.bind(('0.0.0.0', 0))
@@ -266,7 +267,7 @@ class TestGreenIo(LimitedTestCase):
killer = eventlet.spawn(accept_close_early, server) killer = eventlet.spawn(accept_close_early, server)
did_it_work(server) did_it_work(server)
killer.wait() killer.wait()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 0)) server.bind(('0.0.0.0', 0))
@@ -274,7 +275,7 @@ class TestGreenIo(LimitedTestCase):
killer = eventlet.spawn(accept_close_late, server) killer = eventlet.spawn(accept_close_late, server)
did_it_work(server) did_it_work(server)
killer.wait() killer.wait()
def test_del_closes_socket(self): def test_del_closes_socket(self):
def accept_once(listener): def accept_once(listener):
# delete/overwrite the original conn # delete/overwrite the original conn
@@ -298,11 +299,11 @@ class TestGreenIo(LimitedTestCase):
client.connect(('127.0.0.1', server.getsockname()[1])) client.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile() fd = client.makefile()
client.close() client.close()
assert fd.read() == 'hello\n' assert fd.read() == 'hello\n'
assert fd.read() == '' assert fd.read() == ''
killer.wait() killer.wait()
def test_full_duplex(self): def test_full_duplex(self):
large_data = '*' * 10 * min_buf_size() large_data = '*' * 10 * min_buf_size()
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -313,7 +314,7 @@ class TestGreenIo(LimitedTestCase):
def send_large(sock): def send_large(sock):
sock.sendall(large_data) sock.sendall(large_data)
def read_large(sock): def read_large(sock):
result = sock.recv(len(large_data)) result = sock.recv(len(large_data))
expected = 'hello world' expected = 'hello world'
@@ -332,7 +333,7 @@ class TestGreenIo(LimitedTestCase):
result += sock.recv(10) result += sock.recv(10)
self.assertEquals(result, expected) self.assertEquals(result, expected)
send_large_coro.wait() send_large_coro.wait()
server_evt = eventlet.spawn(server) server_evt = eventlet.spawn(server)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', listener.getsockname()[1])) client.connect(('127.0.0.1', listener.getsockname()[1]))
@@ -343,7 +344,7 @@ class TestGreenIo(LimitedTestCase):
server_evt.wait() server_evt.wait()
large_evt.wait() large_evt.wait()
client.close() client.close()
def test_sendall(self): def test_sendall(self):
# test adapted from Marcus Cavanaugh's email # test adapted from Marcus Cavanaugh's email
# it may legitimately take a while, but will eventually complete # it may legitimately take a while, but will eventually complete
@@ -356,7 +357,7 @@ class TestGreenIo(LimitedTestCase):
sock = bufsized(sock, size=bufsize) sock = bufsized(sock, size=bufsize)
sock.sendall('x'*many_bytes) sock.sendall('x'*many_bytes)
sock.sendall('y'*second_bytes) sock.sendall('y'*second_bytes)
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
listener.bind(("", 0)) listener.bind(("", 0))
@@ -371,20 +372,20 @@ class TestGreenIo(LimitedTestCase):
if data == '': if data == '':
break break
total += len(data) total += len(data)
total2 = 0 total2 = 0
while total < second_bytes: while total < second_bytes:
data = client.recv(second_bytes) data = client.recv(second_bytes)
if data == '': if data == '':
break break
total2 += len(data) total2 += len(data)
sender_coro.wait() sender_coro.wait()
client.close() client.close()
for bytes in (1000, 10000, 100000, 1000000): for bytes in (1000, 10000, 100000, 1000000):
test_sendall_impl(bytes) test_sendall_impl(bytes)
def test_wrap_socket(self): def test_wrap_socket(self):
try: try:
import ssl import ssl
@@ -396,7 +397,7 @@ class TestGreenIo(LimitedTestCase):
sock.bind(('127.0.0.1', 0)) sock.bind(('127.0.0.1', 0))
sock.listen(50) sock.listen(50)
ssl_sock = ssl.wrap_socket(sock) ssl_sock = ssl.wrap_socket(sock)
def test_timeout_and_final_write(self): def test_timeout_and_final_write(self):
# This test verifies that a write on a socket that we've # This test verifies that a write on a socket that we've
# stopped listening for doesn't result in an incorrect switch # stopped listening for doesn't result in an incorrect switch
@@ -405,11 +406,11 @@ class TestGreenIo(LimitedTestCase):
server.bind(('127.0.0.1', 0)) server.bind(('127.0.0.1', 0))
server.listen(50) server.listen(50)
bound_port = server.getsockname()[1] bound_port = server.getsockname()[1]
def sender(evt): def sender(evt):
s2, addr = server.accept() s2, addr = server.accept()
wrap_wfile = s2.makefile() wrap_wfile = s2.makefile()
eventlet.sleep(0.02) eventlet.sleep(0.02)
wrap_wfile.write('hi') wrap_wfile.write('hi')
s2.close() s2.close()
@@ -476,7 +477,7 @@ class TestGreenIoLong(LimitedTestCase):
@skip_with_pyevent @skip_with_pyevent
def test_multiple_readers(self): def test_multiple_readers(self):
recvsize = 2 * min_buf_size() recvsize = 2 * min_buf_size()
sendsize = 10 * recvsize sendsize = 10 * recvsize
# test that we can have multiple coroutines reading # test that we can have multiple coroutines reading
# from the same fd. We make no guarantees about which one gets which # from the same fd. We make no guarantees about which one gets which
# bytes, but they should both get at least some # bytes, but they should both get at least some
@@ -486,7 +487,7 @@ class TestGreenIoLong(LimitedTestCase):
if data == '': if data == '':
break break
results.append(data) results.append(data)
results1 = [] results1 = []
results2 = [] results2 = []
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -516,7 +517,7 @@ class TestGreenIoLong(LimitedTestCase):
listener.close() listener.close()
self.assert_(len(results1) > 0) self.assert_(len(results1) > 0)
self.assert_(len(results2) > 0) self.assert_(len(results2) > 0)
if __name__ == '__main__': if __name__ == '__main__':
main() main()