diff --git a/.hgignore b/.hgignore index 1bd3dbf..6d448ad 100644 --- a/.hgignore +++ b/.hgignore @@ -12,4 +12,7 @@ annotated cover nosetests*.xml .coverage -*,cover \ No newline at end of file +*,cover + +syntax: re +^.ropeproject/.*$ diff --git a/eventlet/coros.py b/eventlet/coros.py index 8bbf716..16217ee 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -1,5 +1,4 @@ import collections -import time import traceback import warnings @@ -16,34 +15,34 @@ class NOT_USED: NOT_USED = NOT_USED() def Event(*a, **kw): - warnings.warn("The Event class has been moved to the event module! " - "Please construct event.Event objects instead.", + warnings.warn("The Event class has been moved to the event module! " + "Please construct event.Event objects instead.", DeprecationWarning, stacklevel=2) return _event.Event(*a, **kw) def event(*a, **kw): warnings.warn("The event class has been capitalized and moved! Please " - "construct event.Event objects instead.", + "construct event.Event objects instead.", DeprecationWarning, stacklevel=2) return _event.Event(*a, **kw) def Semaphore(count): warnings.warn("The Semaphore class has moved! Please " - "use semaphore.Semaphore instead.", + "use semaphore.Semaphore instead.", DeprecationWarning, stacklevel=2) return semaphoremod.Semaphore(count) def BoundedSemaphore(count): warnings.warn("The BoundedSemaphore class has moved! Please " - "use semaphore.BoundedSemaphore instead.", + "use semaphore.BoundedSemaphore instead.", DeprecationWarning, stacklevel=2) return semaphoremod.BoundedSemaphore(count) def semaphore(count=0, limit=None): warnings.warn("coros.semaphore is deprecated. Please use either " - "semaphore.Semaphore or semaphore.BoundedSemaphore instead.", + "semaphore.Semaphore or semaphore.BoundedSemaphore instead.", DeprecationWarning, stacklevel=2) if limit is None: return Semaphore(count) @@ -136,7 +135,7 @@ class Queue(object): def __init__(self): warnings.warn("coros.Queue is deprecated. Please use " - "eventlet.queue.Queue instead.", + "eventlet.queue.Queue instead.", DeprecationWarning, stacklevel=2) self.items = collections.deque() self._waiters = set() @@ -148,7 +147,8 @@ class Queue(object): return len(self.items) def __repr__(self): - params = (self.__class__.__name__, hex(id(self)), len(self.items), len(self._waiters)) + params = (self.__class__.__name__, hex(id(self)), + len(self.items), len(self._waiters)) return '<%s at %s items[%d] _waiters[%s]>' % params def send(self, result=None, exc=None): @@ -195,10 +195,10 @@ class Queue(object): def waiting(self): return len(self._waiters) - + def __iter__(self): return self - + def next(self): return self.wait() @@ -207,7 +207,7 @@ class Channel(object): def __init__(self, max_size=0): warnings.warn("coros.Channel is deprecated. Please use " - "eventlet.queue.Queue(0) instead.", + "eventlet.queue.Queue(0) instead.", DeprecationWarning, stacklevel=2) self.max_size = max_size self.items = collections.deque() @@ -221,7 +221,9 @@ class Channel(object): return len(self.items) def __repr__(self): - params = (self.__class__.__name__, hex(id(self)), self.max_size, len(self.items), len(self._waiters), len(self._senders)) + params = (self.__class__.__name__, hex(id(self)), + self.max_size, len(self.items), + len(self._waiters), len(self._senders)) return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params def send(self, result=None, exc=None): @@ -323,7 +325,7 @@ class Actor(object): to process concurrently. If it is 1, the actor will process messages serially. """ - warnings.warn("We're phasing out the Actor class, so as to get rid of" + warnings.warn("We're phasing out the Actor class, so as to get rid of" "the coros module. If you use Actor, please speak up on " "eventletdev@lists.secondlife.com, and we'll come up with a " "transition plan. If no one speaks up, we'll remove Actor " @@ -397,4 +399,3 @@ class Actor(object): >>> eventlet.kill(a._killer) # test cleanup """ raise NotImplementedError() - diff --git a/eventlet/green/CGIHTTPServer.py b/eventlet/green/CGIHTTPServer.py index 01ea4bf..d322359 100644 --- a/eventlet/green/CGIHTTPServer.py +++ b/eventlet/green/CGIHTTPServer.py @@ -4,6 +4,7 @@ from eventlet.green import SimpleHTTPServer from eventlet.green import urllib from eventlet.green import select +test = None # bind prior to patcher.inject to silence pyflakes warning below patcher.inject('CGIHTTPServer', globals(), ('BaseHTTPServer', BaseHTTPServer), @@ -14,4 +15,4 @@ patcher.inject('CGIHTTPServer', del patcher if __name__ == '__main__': - test() \ No newline at end of file + test() # pyflakes false alarm here unless test = None above diff --git a/eventlet/green/socket.py b/eventlet/green/socket.py index 4af8f05..aeadc0d 100644 --- a/eventlet/green/socket.py +++ b/eventlet/green/socket.py @@ -11,13 +11,13 @@ import os import sys import warnings -__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection', +__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection', 'ssl', 'socket'] __original_fromfd__ = __socket.fromfd def fromfd(*args): - return socket(__original_fromfd__(*args)) - + return socket(__original_fromfd__(*args)) + __original_socketpair__ = __socket.socketpair def socketpair(*args): one, two = __original_socketpair__(*args) @@ -35,7 +35,7 @@ def gethostbyname(name): globals()['gethostbyname'] = __original_gethostbyname__ else: globals()['gethostbyname'] = _gethostbyname_tpool - + return globals()['gethostbyname'](name) def _gethostbyname_twisted(name): @@ -51,7 +51,7 @@ def _gethostbyname_tpool(name): # def getaddrinfo(*args, **kw): # return tpool.execute( # __socket.getaddrinfo, *args, **kw) -# +# # XXX there're few more blocking functions in socket # XXX having a hub-independent way to access thread pool would be nice @@ -88,10 +88,10 @@ def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT): def _convert_to_sslerror(ex): """ Transliterates SSL.SysCallErrors to socket.sslerrors""" return sslerror((ex[0], ex[1])) - - + + class GreenSSLObject(object): - """ Wrapper object around the SSLObjects returned by socket.ssl, which have a + """ Wrapper object around the SSLObjects returned by socket.ssl, which have a slightly different interface from SSL.Connection objects. """ def __init__(self, green_ssl_obj): """ Should only be called by a 'green' socket.ssl """ @@ -106,7 +106,7 @@ class GreenSSLObject(object): self.connection.do_handshake() except _SSL.SysCallError, e: raise _convert_to_sslerror(e) - + def read(self, n=1024): """If n is provided, read n bytes from the SSL connection, otherwise read until EOF. The return value is a string of the bytes read.""" @@ -116,9 +116,9 @@ class GreenSSLObject(object): return '' except _SSL.SysCallError, e: raise _convert_to_sslerror(e) - + def write(self, s): - """Writes the string s to the on the object's SSL connection. + """Writes the string s to the on the object's SSL connection. The return value is the number of bytes written. """ try: return self.connection.write(s) @@ -130,13 +130,13 @@ class GreenSSLObject(object): purposes; do not parse the content of this string because its format can't be parsed unambiguously. """ return str(self.connection.get_peer_certificate().get_subject()) - + def issuer(self): """Returns a string describing the issuer of the server's certificate. Useful - for debugging purposes; do not parse the content of this string because its + for debugging purposes; do not parse the content of this string because its format can't be parsed unambiguously.""" return str(self.connection.get_peer_certificate().get_issuer()) - + try: try: diff --git a/eventlet/green/threading.py b/eventlet/green/threading.py index a8c1668..52f9d2c 100644 --- a/eventlet/green/threading.py +++ b/eventlet/green/threading.py @@ -2,7 +2,7 @@ from eventlet import patcher from eventlet.green import thread from eventlet.green import time -__patched__ = ['_start_new_thread', '_allocate_lock', '_get_ident', '_sleep', +__patched__ = ['_start_new_thread', '_allocate_lock', '_get_ident', '_sleep', 'local', 'stack_size'] patcher.inject('threading', @@ -21,6 +21,3 @@ def _patch_main_thread(mod): curthread = mod._active.pop(mod._get_ident(), None) if curthread: mod._active[thread.get_ident()] = curthread - -if __name__ == '__main__': - _test() diff --git a/eventlet/green/time.py b/eventlet/green/time.py index 86d1dff..eede212 100644 --- a/eventlet/green/time.py +++ b/eventlet/green/time.py @@ -3,3 +3,4 @@ for var in dir(__time): exec "%s = __time.%s" % (var, var) __patched__ = ['sleep'] from eventlet.greenthread import sleep +sleep # silence pyflakes diff --git a/eventlet/greenio.py b/eventlet/greenio.py index da26dc6..6829ab8 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -1,7 +1,4 @@ -import eventlet from eventlet.hubs import trampoline -from eventlet.hubs import get_hub - BUFFER_SIZE = 4096 import errno @@ -12,10 +9,6 @@ import sys import time import warnings - -from errno import EWOULDBLOCK, EAGAIN - - __all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe'] CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK)) @@ -36,8 +29,8 @@ def socket_connect(descriptor, address): def socket_accept(descriptor): """ - Attempts to accept() on the descriptor, returns a client,address tuple - if it succeeds; returns None if it needs to trampoline, and raises + Attempts to accept() on the descriptor, returns a client,address tuple + if it succeeds; returns None if it needs to trampoline, and raises any exceptions. """ try: @@ -46,7 +39,7 @@ def socket_accept(descriptor): if e[0] == errno.EWOULDBLOCK: return None raise - + if sys.platform[:3]=="win": # winsock sometimes throws ENOTCONN @@ -62,7 +55,7 @@ else: def set_nonblocking(fd): """ Sets the descriptor to be nonblocking. Works on many file-like - objects as well as sockets. Only sockets can be nonblocking on + objects as well as sockets. Only sockets can be nonblocking on Windows, however. """ try: @@ -96,7 +89,7 @@ try: from socket import _GLOBAL_DEFAULT_TIMEOUT except ImportError: _GLOBAL_DEFAULT_TIMEOUT = object() - + class GreenSocket(object): """ @@ -117,14 +110,14 @@ class GreenSocket(object): self.timeout = fd.gettimeout() or socket.getdefaulttimeout() except AttributeError: self.timeout = socket.getdefaulttimeout() - + set_nonblocking(fd) self.fd = fd self.closed = False # when client calls setblocking(0) or settimeout(0) the socket must # act non-blocking self.act_non_blocking = False - + @property def _sock(self): return self @@ -195,11 +188,11 @@ class GreenSocket(object): else: end = time.time() + self.gettimeout() while True: - if socket_connect(fd, address): - return 0 - if time.time() >= end: - raise socket.timeout(errno.EAGAIN) try: + if socket_connect(fd, address): + return 0 + if time.time() >= end: + raise socket.timeout(errno.EAGAIN) trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout(errno.EAGAIN)) except socket.error, ex: @@ -254,9 +247,9 @@ class GreenSocket(object): return '' else: raise - trampoline(fd, - read=True, - timeout=self.timeout, + trampoline(fd, + read=True, + timeout=self.timeout, timeout_exc=socket.timeout("timed out")) def recvfrom(self, *args): @@ -302,7 +295,6 @@ class GreenSocket(object): return total_sent def sendall(self, data, flags=0): - fd = self.fd tail = self.send(data, flags) len_data = len(data) while tail < len_data: @@ -357,11 +349,11 @@ class GreenPipe(object): self.fd = fd self.closed = False self.recvbuffer = '' - + def close(self): self.fd.close() self.closed = True - + def fileno(self): return self.fd.fileno() @@ -375,7 +367,7 @@ class GreenPipe(object): try: return fd.read(buflen) except IOError, e: - if e[0] != EAGAIN: + if e[0] != errno.EAGAIN: return '' except socket.error, e: if e[0] == errno.EPIPE: @@ -407,7 +399,7 @@ class GreenPipe(object): fd.flush() return len(data) except IOError, e: - if e[0] != EAGAIN: + if e[0] != errno.EAGAIN: raise except ValueError, e: # what's this for? @@ -419,7 +411,7 @@ class GreenPipe(object): def flush(self): pass - + def readuntil(self, terminator, size=None): buf, self.recvbuffer = self.recvbuffer, '' checked = 0 @@ -449,7 +441,7 @@ class GreenPipe(object): buf += d chunk, self.recvbuffer = buf[:size], buf[size:] return chunk - + def readline(self, size=None): return self.readuntil(self.newlines, size=size) @@ -484,24 +476,24 @@ except ImportError: class SSL(object): class WantWriteError(object): pass - + class WantReadError(object): pass - + class ZeroReturnError(object): pass - + class SysCallError(object): pass - + def shutdown_safe(sock): """ Shuts down the socket. This is a convenience method for - code that wants to gracefully handle regular sockets, SSL.Connection + code that wants to gracefully handle regular sockets, SSL.Connection sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6 interchangeably. Both types of ssl socket require a shutdown() before close, but they have different arity on their shutdown method. - + Regular sockets don't need a shutdown before close, but it doesn't hurt. """ try: @@ -516,11 +508,11 @@ def shutdown_safe(sock): # this will often be the case in an http server context if e[0] != errno.ENOTCONN: raise - - + + def connect(addr, family=socket.AF_INET, bind=None): """Convenience function for opening client sockets. - + :param addr: Address of the server to connect to. For TCP sockets, this is a (host, port) tuple. :param family: Socket family, optional. See :mod:`socket` documentation for available families. :param bind: Local address to bind to, optional. @@ -531,14 +523,14 @@ def connect(addr, family=socket.AF_INET, bind=None): sock.bind(bind) sock.connect(addr) return sock - - + + def listen(addr, family=socket.AF_INET, backlog=50): """Convenience function for opening server sockets. This socket can be used in an ``accept()`` loop. Sets SO_REUSEADDR on the socket to save on annoyance. - + :param addr: Address to listen on. For TCP sockets, this is a (host, port) tuple. :param family: Socket family, optional. See :mod:`socket` documentation for available families. :param backlog: The maximum number of queued connections. Should be at least 1; the maximum value is system-dependent. @@ -552,41 +544,40 @@ def listen(addr, family=socket.AF_INET, backlog=50): def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, - cert_reqs=None, ssl_version=None, ca_certs=None, + cert_reqs=None, ssl_version=None, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True): - """Convenience function for converting a regular socket into an SSL - socket. Has the same interface as :func:`ssl.wrap_socket`, but + """Convenience function for converting a regular socket into an SSL + socket. Has the same interface as :func:`ssl.wrap_socket`, but works on 2.5 or earlier, using PyOpenSSL. The preferred idiom is to call wrap_ssl directly on the creation - method, e.g., ``wrap_ssl(connect(addr))`` or + method, e.g., ``wrap_ssl(connect(addr))`` or ``wrap_ssl(listen(addr), server_side=True)``. This way there is no "naked" socket sitting around to accidentally corrupt the SSL session. - + :return Green SSL object. """ pass def serve(sock, handle, concurrency=1000): - """Runs a server on the supplied socket. Calls the function - *handle* in a separate greenthread for every incoming request. - This function blocks the calling greenthread; it won't return until + """Runs a server on the supplied socket. Calls the function + *handle* in a separate greenthread for every incoming request. + This function blocks the calling greenthread; it won't return until the server completes. If you desire an immediate return, spawn a new greenthread for :func:`serve`. - - The *handle* function must raise an EndServerException to - gracefully terminate the server -- that's the only way to get the + + The *handle* function must raise an EndServerException to + gracefully terminate the server -- that's the only way to get the server() function to return. Any other uncaught exceptions raised - in *handle* are raised as exceptions from :func:`serve`, so be - sure to do a good job catching exceptions that your application + in *handle* are raised as exceptions from :func:`serve`, so be + sure to do a good job catching exceptions that your application raises. The return value of *handle* is ignored. - The value in *concurrency* controls the maximum number of - greenthreads that will be open at any time handling requests. When - the server hits the concurrency limit, it stops accepting new + The value in *concurrency* controls the maximum number of + greenthreads that will be open at any time handling requests. When + the server hits the concurrency limit, it stops accepting new connections until the existing ones complete. """ pass - diff --git a/eventlet/greenpool.py b/eventlet/greenpool.py index ca65401..cc72291 100644 --- a/eventlet/greenpool.py +++ b/eventlet/greenpool.py @@ -10,7 +10,7 @@ from eventlet.support import greenlets as greenlet __all__ = ['GreenPool', 'GreenPile'] DEBUG = False - + try: next except NameError: @@ -28,20 +28,20 @@ class GreenPool(object): self.coroutines_running = set() self.sem = semaphore.Semaphore(size) self.no_coros_running = event.Event() - + def resize(self, new_size): """ Change the max number of greenthreads doing work at any given time. - - If resize is called when there are more than *new_size* greenthreads - already working on tasks, they will be allowed to complete but no new - tasks will be allowed to get launched until enough greenthreads finish - their tasks to drop the overall quantity below *new_size*. Until + + If resize is called when there are more than *new_size* greenthreads + already working on tasks, they will be allowed to complete but no new + tasks will be allowed to get launched until enough greenthreads finish + their tasks to drop the overall quantity below *new_size*. Until then, the return value of free() will be negative. """ - size_delta = new_size - self.size + size_delta = new_size - self.size self.sem.counter += size_delta self.size = new_size - + def running(self): """ Returns the number of greenthreads that are currently executing functions in the Parallel's pool.""" @@ -49,20 +49,20 @@ class GreenPool(object): def free(self): """ Returns the number of greenthreads available for use. - - If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will + + If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will block the calling greenthread until a slot becomes available.""" return self.sem.counter def spawn(self, function, *args, **kwargs): """Run the *function* with its arguments in its own green thread. - Returns the :class:`GreenThread ` + Returns the :class:`GreenThread ` object that is running the function, which can be used to retrieve the results. - + If the pool is currently at capacity, ``spawn`` will block until one of the running greenthreads completes its task and frees up a slot. - + This function is reentrant; *function* can call ``spawn`` on the same pool without risk of deadlocking the whole thing. """ @@ -82,7 +82,7 @@ class GreenPool(object): self.coroutines_running.add(gt) gt.link(self._spawn_done) return gt - + def _spawn_n_impl(self, func, args, kwargs, coro): try: try: @@ -98,10 +98,10 @@ class GreenPool(object): else: coro = greenthread.getcurrent() self._spawn_done(coro) - + def spawn_n(self, function, *args, **kwargs): - """ Create a greenthread to run the *function*, the same as - :meth:`spawn`. The difference is that :meth:`spawn_n` returns + """ Create a greenthread to run the *function*, the same as + :meth:`spawn`. The difference is that :meth:`spawn_n` returns None; the results of *function* are not retrievable. """ # if reentering an empty pool, don't try to wait on a coroutine freeing @@ -111,7 +111,7 @@ class GreenPool(object): self._spawn_n_impl(function, args, kwargs, None) else: self.sem.acquire() - g = greenthread.spawn_n(self._spawn_n_impl, + g = greenthread.spawn_n(self._spawn_n_impl, function, args, kwargs, True) if not self.coroutines_running: self.no_coros_running = event.Event() @@ -121,7 +121,7 @@ class GreenPool(object): """Waits until all greenthreads in the pool are finished working.""" if self.running(): self.no_coros_running.wait() - + def _spawn_done(self, coro): self.sem.release() if coro is not None: @@ -130,25 +130,25 @@ class GreenPool(object): # we can finish off any waitall() calls that might be pending if self.sem.balance == self.size: self.no_coros_running.send(None) - + def waiting(self): """Return the number of greenthreads waiting to spawn. """ if self.sem.balance < 0: return -self.sem.balance else: - return 0 - + return 0 + def _do_map(self, func, it, gi): for args in it: gi.spawn(func, *args) gi.spawn(return_stop_iteration) - + def starmap(self, function, iterable): - """This is the same as :func:`itertools.starmap`, except that *func* is - executed in a separate green thread for each item, with the concurrency - limited by the pool's size. In operation, starmap consumes a constant - amount of memory, proportional to the size of the pool, and is thus + """This is the same as :func:`itertools.starmap`, except that *func* is + executed in a separate green thread for each item, with the concurrency + limited by the pool's size. In operation, starmap consumes a constant + amount of memory, proportional to the size of the pool, and is thus suited for iterating over extremely long input lists. """ if function is None: @@ -163,22 +163,22 @@ class GreenPool(object): """ return self.starmap(function, itertools.izip(*iterables)) - + def return_stop_iteration(): return StopIteration() - + class GreenPile(object): """GreenPile is an abstraction representing a bunch of I/O-related tasks. - + Construct a GreenPile with an existing GreenPool object. The GreenPile will - then use that pool's concurrency as it processes its jobs. There can be + then use that pool's concurrency as it processes its jobs. There can be many GreenPiles associated with a single GreenPool. - - A GreenPile can also be constructed standalone, not associated with any - GreenPool. To do this, construct it with an integer size parameter instead + + A GreenPile can also be constructed standalone, not associated with any + GreenPool. To do this, construct it with an integer size parameter instead of a GreenPool. - + It is not advisable to iterate over a GreenPile in a different greenthread than the one which is calling spawn. The iterator will exit early in that situation. @@ -191,9 +191,9 @@ class GreenPile(object): self.waiters = queue.LightQueue() self.used = False self.counter = 0 - + def spawn(self, func, *args, **kw): - """Runs *func* in its own green thread, with the result available by + """Runs *func* in its own green thread, with the result available by iterating over the GreenPile object.""" self.used = True self.counter += 1 @@ -203,10 +203,10 @@ class GreenPile(object): except: self.counter -= 1 raise - + def __iter__(self): return self - + def next(self): """Wait for the next result, suspending the current greenthread until it is available. Raises StopIteration when there are no more results.""" @@ -216,15 +216,15 @@ class GreenPile(object): return self.waiters.get().wait() finally: self.counter -= 1 - -# this is identical to GreenPile but it blocks on spawn if the results + +# this is identical to GreenPile but it blocks on spawn if the results # aren't consumed, and it doesn't generate its own StopIteration exception, # instead relying on the spawning process to send one in when it's done class GreenMap(GreenPile): def __init__(self, size_or_pool): super(GreenMap, self).__init__(size_or_pool) self.waiters = queue.LightQueue(maxsize=self.pool.size) - + def next(self): try: val = self.waiters.get().wait() diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index c2a1145..af33366 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -21,8 +21,8 @@ class FdListener(object): def __repr__(self): return "%s(%r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, self.cb) __str__ = __repr__ - - + + # in debug mode, track the call site that created the listener class DebugListener(FdListener): def __init__(self, evtype, fileno, cb): @@ -30,20 +30,21 @@ class DebugListener(FdListener): self.greenlet = greenlet.getcurrent() super(DebugListener, self).__init__(evtype, fileno, cb) def __repr__(self): - return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % (self.evtype, - self.fileno, - self.cb, - self.greenlet, - ''.join(self.where_called)) + return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % ( + self.evtype, + self.fileno, + self.cb, + self.greenlet, + ''.join(self.where_called)) __str__ = __repr__ - + class BaseHub(object): """ Base hub class for easing the implementation of subclasses that are specific to a particular underlying event architecture. """ SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) - + READ = READ WRITE = WRITE @@ -58,7 +59,7 @@ class BaseHub(object): self.next_timers = [] self.lclass = FdListener self.debug_exceptions = True - + def add(self, evtype, fileno, cb): """ Signals an intent to or write a particular file descriptor. @@ -81,9 +82,9 @@ class BaseHub(object): pass if listener_list: self.listeners[listener.evtype][listener.fileno] = listener_list - + def remove_descriptor(self, fileno): - """ Completely remove all listeners for this fileno. For internal use + """ Completely remove all listeners for this fileno. For internal use only.""" self.listeners[READ].pop(fileno, None) self.listeners[WRITE].pop(fileno, None) @@ -106,7 +107,7 @@ class BaseHub(object): if self.greenlet.dead: self.greenlet = greenlet.greenlet(self.run) try: - if self.greenlet.parent is not cur: + if self.greenlet.parent is not cur: cur.parent = self.greenlet except ValueError: pass # gets raised if there is a greenlet parent cycle @@ -231,8 +232,6 @@ class BaseHub(object): t = self.timers heappop = heapq.heappop - i = 0 - while t: next = t[0] @@ -265,12 +264,12 @@ class BaseHub(object): def get_timers_count(hub): return max(len(hub.timers), len(hub.next_timers)) - + def set_debug_listeners(self, value): if value: self.lclass = DebugListener else: self.lclass = FdListener - + def set_timer_exceptions(self, value): self.debug_exceptions = value diff --git a/eventlet/hubs/pyevent.py b/eventlet/hubs/pyevent.py index f9d2496..fa1b090 100644 --- a/eventlet/hubs/pyevent.py +++ b/eventlet/hubs/pyevent.py @@ -29,7 +29,7 @@ class event_wrapper(object): if self.impl is not None: self.impl.delete() self.impl = None - + @property def pending(self): return bool(self.impl and self.impl.pending()) @@ -41,9 +41,11 @@ class Hub(BaseHub): def __init__(self): super(Hub,self).__init__() event.init() - + self.signal_exc_info = None - self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt)) + self.signal( + 2, + lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt)) self.events_to_add = [] def dispatch(self): @@ -76,7 +78,8 @@ class Hub(BaseHub): raise except: if self.signal_exc_info is not None: - self.schedule_call_global(0, greenlet.getcurrent().parent.throw, *self.signal_exc_info) + self.schedule_call_global( + 0, greenlet.getcurrent().parent.throw, *self.signal_exc_info) self.signal_exc_info = None else: self.squelch_timer_exception(None, sys.exc_info()) @@ -86,25 +89,25 @@ class Hub(BaseHub): def _getrunning(self): return bool(self.greenlet) - + def _setrunning(self, value): pass # exists for compatibility with BaseHub running = property(_getrunning, _setrunning) def add(self, evtype, fileno, real_cb): - # this is stupid: pyevent won't call a callback unless it's a function, + # this is stupid: pyevent won't call a callback unless it's a function, # so we have to force it to be one here if isinstance(real_cb, types.BuiltinMethodType): def cb(_d): real_cb(_d) else: cb = real_cb - + if evtype is READ: evt = event.read(fileno, cb, fileno) elif evtype is WRITE: evt = event.write(fileno, cb, fileno) - + listener = FdListener(evtype, fileno, evt) self.listeners[evtype].setdefault(fileno, []).append(listener) return listener @@ -117,22 +120,22 @@ class Hub(BaseHub): self.signal_exc_info = sys.exc_info() event.abort() return event_wrapper(event.signal(signalnum, wrapper)) - + def remove(self, listener): super(Hub, self).remove(listener) listener.cb.delete() - + def remove_descriptor(self, fileno): for lcontainer in self.listeners.itervalues(): l_list = lcontainer.pop(fileno, None) for listener in l_list: try: listener.cb.delete() - except SYSTEM_EXCEPTIONS: + except self.SYSTEM_EXCEPTIONS: raise except: traceback.print_exc() - + def schedule_call_local(self, seconds, cb, *args, **kwargs): current = greenlet.getcurrent() if current is self.greenlet: @@ -149,7 +152,7 @@ class Hub(BaseHub): wrapper = event_wrapper(event_impl, seconds=seconds) self.events_to_add.append(wrapper) return wrapper - + def _version_info(self): baseversion = event.__version__ return baseversion @@ -169,4 +172,3 @@ def _scheduled_call_local(event_impl, handle, evtype, arg): cb(*args, **kwargs) finally: event_impl.delete() - diff --git a/eventlet/hubs/twistedr.py b/eventlet/hubs/twistedr.py index 236f634..9c6e78b 100644 --- a/eventlet/hubs/twistedr.py +++ b/eventlet/hubs/twistedr.py @@ -49,7 +49,7 @@ class socket_rwdescriptor(FdListener): super(socket_rwdescriptor, self).__init__(evtype, fileno, cb) if not isinstance(fileno, (int,long)): raise TypeError("Expected int or long, got %s" % type(fileno)) - # Twisted expects fileno to be a callable, not an attribute + # Twisted expects fileno to be a callable, not an attribute def _fileno(): return fileno self.fileno = _fileno @@ -74,8 +74,8 @@ class socket_rwdescriptor(FdListener): # to the mainloop occurs, twisted will not re-evaluate the delayed calls # because it assumes that none were scheduled since no client code was executed # (it has no idea it was switched away). So, we restart the mainloop. - # XXX this is not enough, pollreactor prints the traceback for this and epollreactor - # times out. see test__hub.TestCloseSocketWhilePolling + # XXX this is not enough, pollreactor prints the traceback for + # this and epollreactor times out. see test__hub.TestCloseSocketWhilePolling raise greenlet.GreenletExit logstr = "twistedr" @@ -95,7 +95,7 @@ class BaseTwistedHub(object): # XXX: remove me from here. make functions that depend on reactor # XXX: hub's methods uses_twisted_reactor = True - + WRITE = WRITE READ = READ @@ -103,9 +103,10 @@ class BaseTwistedHub(object): self.greenlet = mainloop_greenlet def switch(self): - assert api.getcurrent() is not self.greenlet, "Cannot switch from MAINLOOP to MAINLOOP" + assert getcurrent() is not self.greenlet, \ + "Cannot switch from MAINLOOP to MAINLOOP" try: - api.getcurrent().parent = self.greenlet + getcurrent().parent = self.greenlet except ValueError: pass return self.greenlet.switch() @@ -127,14 +128,15 @@ class BaseTwistedHub(object): from twisted.internet import reactor reactor.removeReader(descriptor) reactor.removeWriter(descriptor) - + def schedule_call_local(self, seconds, func, *args, **kwargs): from twisted.internet import reactor def call_if_greenlet_alive(*args1, **kwargs1): if timer.greenlet.dead: return return func(*args1, **kwargs1) - timer = callLater(LocalDelayedCall, reactor, seconds, call_if_greenlet_alive, *args, **kwargs) + timer = callLater(LocalDelayedCall, reactor, seconds, + call_if_greenlet_alive, *args, **kwargs) return timer schedule_call = schedule_call_local @@ -189,18 +191,22 @@ class TwistedHub(BaseTwistedHub): installSignalHandlers = False def __init__(self): - assert Hub.state==0, ('%s hub can only be instantiated once' % type(self).__name__, Hub.state) + assert Hub.state==0, ('%s hub can only be instantiated once'%type(self).__name__, + Hub.state) Hub.state = 1 - make_twisted_threadpool_daemonic() # otherwise the program would hang after the main greenlet exited - g = api.Greenlet(self.run) + make_twisted_threadpool_daemonic() # otherwise the program + # would hang after the main + # greenlet exited + g = greenlet.greenlet(self.run) BaseTwistedHub.__init__(self, g) def switch(self): - assert api.getcurrent() is not self.greenlet, "Cannot switch from MAINLOOP to MAINLOOP" + assert getcurrent() is not self.greenlet, \ + "Cannot switch from MAINLOOP to MAINLOOP" if self.greenlet.dead: - self.greenlet = api.Greenlet(self.run) + self.greenlet = greenlet.greenlet(self.run) try: - api.getcurrent().parent = self.greenlet + getcurrent().parent = self.greenlet except ValueError: pass return self.greenlet.switch() @@ -255,5 +261,3 @@ def make_twisted_threadpool_daemonic(): from twisted.python.threadpool import ThreadPool if ThreadPool.threadFactory != DaemonicThread: ThreadPool.threadFactory = DaemonicThread - - diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 1383d87..ecb67d9 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -7,17 +7,17 @@ __exclude = set(('__builtins__', '__file__', '__name__')) def inject(module_name, new_globals, *additional_modules): """Base method for "injecting" greened modules into an imported module. It - imports the module specified in *module_name*, arranging things so - that the already-imported modules in *additional_modules* are used when + imports the module specified in *module_name*, arranging things so + that the already-imported modules in *additional_modules* are used when *module_name* makes its imports. - - *new_globals* is either None or a globals dictionary that gets populated + + *new_globals* is either None or a globals dictionary that gets populated with the contents of the *module_name* module. This is useful when creating a "green" version of some other module. - + *additional_modules* should be a collection of two-element tuples, of the - form (, ). If it's not specified, a default selection of - name/module pairs is used, which should cover all use cases but may be + form (, ). If it's not specified, a default selection of + name/module pairs is used, which should cover all use cases but may be slower because there are inevitably redundant or unnecessary imports. """ if not additional_modules: @@ -26,16 +26,17 @@ def inject(module_name, new_globals, *additional_modules): _green_os_modules() + _green_select_modules() + _green_socket_modules() + - _green_thread_modules() + + _green_thread_modules() + _green_time_modules()) - + ## Put the specified modules in sys.modules for the duration of the import saved = {} for name, mod in additional_modules: saved[name] = sys.modules.get(name, None) sys.modules[name] = mod - ## Remove the old module from sys.modules and reimport it while the specified modules are in place + ## Remove the old module from sys.modules and reimport it while + ## the specified modules are in place old_module = sys.modules.pop(module_name, None) try: module = __import__(module_name, {}, {}, module_name.split('.')[:-1]) @@ -66,20 +67,20 @@ def inject(module_name, new_globals, *additional_modules): def import_patched(module_name, *additional_modules, **kw_additional_modules): - """Imports a module in a way that ensures that the module uses "green" - versions of the standard library modules, so that everything works + """Imports a module in a way that ensures that the module uses "green" + versions of the standard library modules, so that everything works nonblockingly. - + The only required argument is the name of the module to be imported. """ return inject( - module_name, - None, - *additional_modules + tuple(kw_additional_modules.items())) + module_name, + None, + *additional_modules + tuple(kw_additional_modules.items())) def patch_function(func, *additional_modules): - """Huge hack here -- patches the specified modules for the + """Huge hack here -- patches the specified modules for the duration of the function call.""" if not additional_modules: # supply some defaults @@ -105,7 +106,7 @@ def patch_function(func, *additional_modules): else: del sys.modules[name] return patched - + _originals = {} def original(modname): mod = _originals.get(modname) @@ -122,18 +123,18 @@ def original(modname): return _originals.get(modname) already_patched = {} -def monkey_patch(all=True, os=False, select=False, +def monkey_patch(all=True, os=False, select=False, socket=False, thread=False, time=False): """Globally patches certain system modules to be greenthread-friendly. - + The keyword arguments afford some control over which modules are patched. - If *all* is True, then all modules are patched regardless of the other + If *all* is True, then all modules are patched regardless of the other arguments. If it's False, then the rest of the keyword arguments control patching of specific subsections of the standard library. Most patch the single module of the same name (os, time, select). The exceptions are socket, which also patches the ssl module if present; and thread, which patches thread, threading, and Queue. - + It's safe to call monkey_patch multiple times. """ modules_to_patch = [] @@ -144,7 +145,7 @@ def monkey_patch(all=True, os=False, select=False, modules_to_patch += _green_select_modules() already_patched['select'] = True if all or socket and not already_patched.get('socket'): - modules_to_patch += _green_socket_modules() + modules_to_patch += _green_socket_modules() already_patched['socket'] = True if all or thread and not already_patched.get('thread'): # hacks ahead @@ -156,19 +157,20 @@ def monkey_patch(all=True, os=False, select=False, if all or time and not already_patched.get('time'): modules_to_patch += _green_time_modules() already_patched['time'] = True - + for name, mod in modules_to_patch: orig_mod = sys.modules.get(name) - for attr in mod.__patched__: - orig_attr = getattr(orig_mod, attr, None) - patched_attr = getattr(mod, attr, None) + for attr_name in mod.__patched__: + #orig_attr = getattr(orig_mod, attr_name, None) + # @@tavis: line above wasn't used, not sure what author intended + patched_attr = getattr(mod, attr_name, None) if patched_attr is not None: - setattr(orig_mod, attr, patched_attr) + setattr(orig_mod, attr_name, patched_attr) def _green_os_modules(): from eventlet.green import os return [('os', os)] - + def _green_select_modules(): from eventlet.green import select return [('select', select)] @@ -186,7 +188,7 @@ def _green_thread_modules(): from eventlet.green import thread from eventlet.green import threading return [('Queue', Queue), ('thread', thread), ('threading', threading)] - + def _green_time_modules(): from eventlet.green import time return [('time', time)] diff --git a/eventlet/pools.py b/eventlet/pools.py index 73d8238..6f72285 100644 --- a/eventlet/pools.py +++ b/eventlet/pools.py @@ -10,7 +10,7 @@ try: exec(''' @contextmanager def item_impl(self): - """ Get an object out of the pool, for use with with statement. + """ Get an object out of the pool, for use with with statement. >>> from eventlet import pools >>> pool = pools.TokenPool(max_size=4) @@ -35,20 +35,20 @@ except ImportError: class Pool(object): """ Pool is a base class that implements resource limitation and construction. - It is meant to be subclassed. When subclassing, define only + It is meant to be subclassed. When subclassing, define only the :meth:`create` method to implement the desired resource:: - + class MyPool(pools.Pool): def create(self): return MyObject() - + If using 2.5 or greater, the :meth:`item` method acts as a context manager; that's the best way to use it:: - + with mypool.item() as thing: thing.dostuff() - - If stuck on 2.4, the :meth:`get` and :meth:`put` methods are the preferred + + If stuck on 2.4, the :meth:`get` and :meth:`put` methods are the preferred nomenclature. Use a ``finally`` to ensure that nothing is leaked:: thing = self.pool.get() @@ -59,12 +59,12 @@ class Pool(object): The maximum size of the pool can be modified at runtime via the :meth:`resize` method. - - Specifying a non-zero *min-size* argument pre-populates the pool with - *min_size* items. *max-size* sets a hard limit to the size of the pool -- - it cannot contain any more items than *max_size*, and if there are already - *max_size* items 'checked out' of the pool, the pool will cause any - greenthread calling :meth:`get` to cooperatively yield until an item + + Specifying a non-zero *min-size* argument pre-populates the pool with + *min_size* items. *max-size* sets a hard limit to the size of the pool -- + it cannot contain any more items than *max_size*, and if there are already + *max_size* items 'checked out' of the pool, the pool will cause any + greenthread calling :meth:`get` to cooperatively yield until an item is :meth:`put` in. """ def __init__(self, min_size=0, max_size=4, order_as_stack=False): @@ -96,7 +96,7 @@ class Pool(object): self.current_size += 1 return created return self.channel.get() - + if item_impl is not None: item = item_impl @@ -118,11 +118,11 @@ class Pool(object): def resize(self, new_size): """Resize the pool to *new_size*. - - Adjusting this number does not affect existing items checked out of - the pool, nor on any greenthreads who are waiting for an item to free + + Adjusting this number does not affect existing items checked out of + the pool, nor on any greenthreads who are waiting for an item to free up. Some indeterminate number of :meth:`get`/:meth:`put` - cycles will be necessary before the new maximum size truly matches + cycles will be necessary before the new maximum size truly matches the actual operation of the pool. """ self.max_size = new_size @@ -137,18 +137,18 @@ class Pool(object): """Return the number of routines waiting for a pool item. """ return max(0, self.channel.getting() - self.channel.putting()) - + def create(self): """Generate a new pool item. This method must be overridden in order for the pool to function. It accepts no arguments and returns a single instance of whatever thing the pool is supposed to contain. - - In general, :meth:`create` is called whenever the pool exceeds its - previous high-water mark of concurrently-checked-out-items. In other - words, in a new pool with *min_size* of 0, the very first call - to :meth:`get` will result in a call to :meth:`create`. If the first - caller calls :meth:`put` before some other caller calls :meth:`get`, - then the first item will be returned, and :meth:`create` will not be + + In general, :meth:`create` is called whenever the pool exceeds its + previous high-water mark of concurrently-checked-out-items. In other + words, in a new pool with *min_size* of 0, the very first call + to :meth:`get` will result in a call to :meth:`create`. If the first + caller calls :meth:`put` before some other caller calls :meth:`get`, + then the first item will be returned, and :meth:`create` will not be called a second time. """ raise NotImplementedError("Implement in subclass") @@ -165,5 +165,3 @@ class TokenPool(Pool): """ def create(self): return Token() - - diff --git a/eventlet/processes.py b/eventlet/processes.py index d79cf25..bcf2a5f 100644 --- a/eventlet/processes.py +++ b/eventlet/processes.py @@ -1,5 +1,5 @@ import warnings -warnings.warn("eventlet.processes is deprecated in favor of " +warnings.warn("eventlet.processes is deprecated in favor of " "eventlet.green.subprocess, which is API-compatible with the standard " " library subprocess module.", DeprecationWarning, stacklevel=2) @@ -11,7 +11,6 @@ import popen2 import signal from eventlet import api -from eventlet import coros from eventlet import pools from eventlet import greenio @@ -69,7 +68,9 @@ class Process(object): greenio.set_nonblocking(child_stdout_stderr) greenio.set_nonblocking(child_stdin) self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr) - self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes + self.child_stdout_stderr.newlines = '\n' # the default is + # \r\n, which aren't sent over + # pipes self.child_stdin = greenio.GreenPipe(child_stdin) self.child_stdin.newlines = '\n' diff --git a/eventlet/saranwrap.py b/eventlet/saranwrap.py index 5838253..3587935 100644 --- a/eventlet/saranwrap.py +++ b/eventlet/saranwrap.py @@ -36,7 +36,10 @@ def wrap(obj, dead_callback = None): return wrap_module(obj.__name__, dead_callback) pythonpath_sync() if _g_debug_mode: - p = Process(sys.executable, ["-W", "ignore", __file__, '--child', '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback) + p = Process(sys.executable, + ["-W", "ignore", __file__, '--child', + '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], + dead_callback) else: p = Process(sys.executable, ["-W", "ignore", __file__, '--child'], dead_callback) prox = Proxy(ChildProcess(p, p)) @@ -53,9 +56,13 @@ def wrap_module(fqname, dead_callback = None): pythonpath_sync() global _g_debug_mode if _g_debug_mode: - p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname, '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback) + p = Process(sys.executable, + ["-W", "ignore", __file__, '--module', fqname, + '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], + dead_callback) else: - p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname,], dead_callback) + p = Process(sys.executable, + ["-W", "ignore", __file__, '--module', fqname,], dead_callback) prox = Proxy(ChildProcess(p,p)) return prox @@ -140,7 +147,8 @@ def _write_request(param, output): def _is_local(attribute): "Return ``True`` if the attribute should be handled locally" -# return attribute in ('_in', '_out', '_id', '__getattribute__', '__setattr__', '__dict__') +# return attribute in ('_in', '_out', '_id', '__getattribute__', +# '__setattr__', '__dict__') # good enough for now. :) if '__local_dict' in attribute: return True @@ -266,7 +274,8 @@ class Proxy(object): my_cp = self.__local_dict['_cp'] my_id = self.__local_dict['_id'] # Pass the set attribute across - request = Request('setattr', {'id':my_id, 'attribute':attribute, 'value':value}) + request = Request('setattr', + {'id':my_id, 'attribute':attribute, 'value':value}) return my_cp.make_request(request, attribute=attribute) class ObjectProxy(Proxy): @@ -324,7 +333,8 @@ class ObjectProxy(Proxy): return self.__str__() def __nonzero__(self): - # bool(obj) is another method that skips __getattribute__. There's no good way to just pass + # bool(obj) is another method that skips __getattribute__. + # There's no good way to just pass # the method on, so we use a special message. my_cp = self.__local_dict['_cp'] my_id = self.__local_dict['_id'] @@ -395,7 +405,9 @@ class CallableProxy(object): # having already checked if the method starts with '_' so we # can safely pass this one to the remote object. #_prnt("calling %s %s" % (self._object_id, self._name) - request = Request('call', {'id':self._object_id, 'name':self._name, 'args':args, 'kwargs':kwargs}) + request = Request('call', {'id':self._object_id, + 'name':self._name, + 'args':args, 'kwargs':kwargs}) return self._cp.make_request(request, attribute=self._name) class Server(object): @@ -444,14 +456,15 @@ class Server(object): def handle_setitem(self, obj, req): obj[req['key']] = req['value'] - return None # *TODO figure out what the actual return value of __setitem__ should be + return None # *TODO figure out what the actual return value + # of __setitem__ should be def handle_eq(self, obj, req): #_log("__eq__ %s %s" % (obj, req)) rhs = None try: rhs = self._objects[req['rhs']] - except KeyError, e: + except KeyError: return False return (obj == rhs) @@ -565,7 +578,7 @@ class Server(object): #_log("objects: %s" % self._objects) s = Pickle.dumps(body) _log(`s`) - str_ = _write_lp_hunk(self._out, s) + _write_lp_hunk(self._out, s) def write_exception(self, e): """Helper method to respond with an exception.""" @@ -621,14 +634,16 @@ def named(name): import_err_strings.append(err.__str__()) toimport = '.'.join(toimport.split('.')[:-1]) if obj is None: - raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings)) + raise ImportError( + '%s could not be imported. Import errors: %r' % (name, import_err_strings)) for seg in name.split('.')[1:]: try: obj = getattr(obj, seg) except AttributeError: dirobj = dir(obj) dirobj.sort() - raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % ( + raise AttributeError( + 'attribute %r missing from %r (%r) %r. Import errors: %r' % ( seg, obj, dirobj, name, import_err_strings)) return obj diff --git a/eventlet/support/greenlets.py b/eventlet/support/greenlets.py index cb75667..1d42858 100644 --- a/eventlet/support/greenlets.py +++ b/eventlet/support/greenlets.py @@ -17,5 +17,6 @@ except ImportError, e: except ImportError: try: from support.stacklesss import greenlet, getcurrent, GreenletExit + (greenlet, getcurrent, GreenletExit) # silence pyflakes except ImportError, e: raise ImportError("Unable to find an implementation of greenlet.") diff --git a/eventlet/support/stacklesss.py b/eventlet/support/stacklesss.py index 8961204..2531438 100644 --- a/eventlet/support/stacklesss.py +++ b/eventlet/support/stacklesss.py @@ -30,7 +30,7 @@ class FirstSwitch(object): gr.t = t tasklet_to_greenlet[t] = gr t.setup(*args, **kw) - result = t.run() + t.run() class greenlet(object): @@ -75,10 +75,10 @@ def emulate(): module.getcurrent = getcurrent module.GreenletExit = GreenletExit - caller = t = stackless.getcurrent() - tasklet_to_greenlet[t] = None + caller = stackless.getcurrent() + tasklet_to_greenlet[caller] = None main_coro = greenlet() - tasklet_to_greenlet[t] = main_coro - main_coro.t = t + tasklet_to_greenlet[caller] = main_coro + main_coro.t = caller del main_coro.switch ## It's already running coro_args[main_coro] = None diff --git a/eventlet/tpool.py b/eventlet/tpool.py index a2926a8..5b8ab42 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -33,7 +33,7 @@ _rfile = _wfile = None def _signal_t2e(): _wfile.write(' ') _wfile.flush() - + _reqq = None _rspq = None @@ -74,9 +74,13 @@ def tworker(): rv = meth(*args,**kwargs) except SYS_EXCS: raise - except Exception,exn: + except Exception: rv = sys.exc_info() - _rspq.put((e,rv)) + _rspq.put((e,rv)) # @@tavis: not supposed to + # keep references to + # sys.exc_info() so it would + # be worthwhile testing + # if this leads to memory leaks meth = args = kwargs = e = rv = None _signal_t2e() @@ -118,10 +122,10 @@ def proxy_call(autowrap, f, *args, **kwargs): """ Call a function *f* and returns the value. If the type of the return value is in the *autowrap* collection, then it is wrapped in a :class:`Proxy` - object before return. - + object before return. + Normally *f* will be called in the threadpool with :func:`execute`; if the - keyword argument "nonblocking" is set to ``True``, it will simply be + keyword argument "nonblocking" is set to ``True``, it will simply be executed directly. This is useful if you have an object which has methods that don't need to be called in a separate thread, but which return objects that should be Proxy wrapped. @@ -242,7 +246,7 @@ def setup(): _threads.add(t) _coro = greenthread.spawn_n(tpool_trampoline) - + def killall(): global _setup_already, _reqq, _rspq, _rfile, _wfile diff --git a/eventlet/twistedutil/join_reactor.py b/eventlet/twistedutil/join_reactor.py index f1f9470..4e9ccc2 100644 --- a/eventlet/twistedutil/join_reactor.py +++ b/eventlet/twistedutil/join_reactor.py @@ -4,10 +4,10 @@ You generally don't have to use it unless you need to call reactor.run() yourself. """ from eventlet.hubs.twistedr import BaseTwistedHub -from eventlet.api import use_hub, _threadlocal +from eventlet import use_hub from eventlet.support import greenlets as greenlet +from eventlet.hubs import _threadlocal use_hub(BaseTwistedHub) assert not hasattr(_threadlocal, 'hub') hub = _threadlocal.hub = _threadlocal.Hub(greenlet.getcurrent()) - diff --git a/eventlet/util.py b/eventlet/util.py index 4c86ef2..c658df5 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -1,12 +1,11 @@ -import os import socket -import errno import warnings -from eventlet import greenio - def g_log(*args): - warnings.warn("eventlet.util.g_log is deprecated because we're pretty sure no one uses it. Send mail to eventletdev@lists.secondlife.com if you are actually using it.", + warnings.warn("eventlet.util.g_log is deprecated because " + "we're pretty sure no one uses it. " + "Send mail to eventletdev@lists.secondlife.com " + "if you are actually using it.", DeprecationWarning, stacklevel=2) import sys from eventlet.support import greenlets as greenlet @@ -42,21 +41,22 @@ try: server_side=server_side, cert_reqs=ssl.CERT_NONE, ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=None, do_handshake_on_connect=True, - suppress_ragged_eofs=True) + suppress_ragged_eofs=True) except ImportError: # if ssl is not available, use PyOpenSSL def wrap_ssl(sock, certificate=None, private_key=None, server_side=False): try: from eventlet.green.OpenSSL import SSL except ImportError: - raise ImportError("To use SSL with Eventlet, you must install PyOpenSSL or use Python 2.6 or later.") + raise ImportError("To use SSL with Eventlet, " + "you must install PyOpenSSL or use Python 2.6 or later.") context = SSL.Context(SSL.SSLv23_METHOD) if certificate is not None: context.use_certificate_file(certificate) if private_key is not None: context.use_privatekey_file(private_key) context.set_verify(SSL.VERIFY_NONE, lambda *x: True) - + connection = SSL.Connection(context, sock) if server_side: connection.set_accept_state() @@ -65,22 +65,22 @@ except ImportError: return connection def wrap_socket_with_coroutine_socket(use_thread_pool=None): - warnings.warn("eventlet.util.wrap_socket_with_coroutine_socket() is now " + warnings.warn("eventlet.util.wrap_socket_with_coroutine_socket() is now " "eventlet.patcher.monkey_patch(all=False, socket=True)", DeprecationWarning, stacklevel=2) from eventlet import patcher patcher.monkey_patch(all=False, socket=True) - + def wrap_pipes_with_coroutine_pipes(): - warnings.warn("eventlet.util.wrap_pipes_with_coroutine_pipes() is now " + warnings.warn("eventlet.util.wrap_pipes_with_coroutine_pipes() is now " "eventlet.patcher.monkey_patch(all=False, os=True)", DeprecationWarning, stacklevel=2) from eventlet import patcher patcher.monkey_patch(all=False, os=True) def wrap_select_with_coroutine_select(): - warnings.warn("eventlet.util.wrap_select_with_coroutine_select() is now " + warnings.warn("eventlet.util.wrap_select_with_coroutine_select() is now " "eventlet.patcher.monkey_patch(all=False, select=True)", DeprecationWarning, stacklevel=2) from eventlet import patcher @@ -92,7 +92,7 @@ def wrap_threading_local_with_coro_local(): Since greenlets cannot cross threads, so this should be semantically identical to ``threadlocal.local`` """ - warnings.warn("eventlet.util.wrap_threading_local_with_coro_local() is now " + warnings.warn("eventlet.util.wrap_threading_local_with_coro_local() is now " "eventlet.patcher.monkey_patch(all=False, thread=True) -- though" "note that more than just _local is patched now.", DeprecationWarning, stacklevel=2) @@ -126,4 +126,3 @@ def set_reuse_addr(descriptor): descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1) except socket.error: pass - diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index 1eebd7e..e637f99 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -15,7 +15,8 @@ DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024 DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1' MAX_REQUEST_LINE = 8192 MINIMUM_CHUNK_SIZE = 4096 -DEFAULT_LOG_FORMAT='%(client_ip)s - - [%(date_time)s] "%(request_line)s" %(status_code)s %(body_length)s %(wall_seconds).6f' +DEFAULT_LOG_FORMAT= ('%(client_ip)s - - [%(date_time)s] "%(request_line)s"' + ' %(status_code)s %(body_length)s %(wall_seconds).6f') __all__ = ['server', 'format_date_time'] @@ -32,7 +33,7 @@ def format_date_time(timestamp): _weekdayname[wd], day, _monthname[month], year, hh, mm, ss ) -# Collections of error codes to compare against. Not all attributes are set +# Collections of error codes to compare against. Not all attributes are set # on errno module on all platforms, so some are literals :( BAD_SOCK = set((errno.EBADF, 10053)) BROKEN_SOCK = set((errno.EPIPE, errno.ECONNRESET)) @@ -41,8 +42,8 @@ BROKEN_SOCK = set((errno.EPIPE, errno.ECONNRESET)) ALREADY_HANDLED = object() def get_errno(err): - """ Simple method to get the error code out of socket.error objects. It - compensates for some cases where the code is not in the expected + """ Simple method to get the error code out of socket.error objects. It + compensates for some cases where the code is not in the expected location.""" try: return err[0] @@ -50,13 +51,13 @@ def get_errno(err): return None class Input(object): - def __init__(self, - rfile, - content_length, - wfile=None, + def __init__(self, + rfile, + content_length, + wfile=None, wfile_line=None, chunked_input=False): - + self.rfile = rfile if content_length is not None: content_length = int(content_length) @@ -136,7 +137,7 @@ class Input(object): def __iter__(self): return iter(self.read()) - + def get_socket(self): return self.rfile._sock.dup() @@ -144,7 +145,7 @@ class Input(object): class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): protocol_version = 'HTTP/1.1' minimum_chunk_size = MINIMUM_CHUNK_SIZE - + def setup(self): # overriding SocketServer.setup to correctly handle SSL.Connection objects conn = self.connection = self.request @@ -173,7 +174,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE) if len(self.raw_requestline) == MAX_REQUEST_LINE: self.wfile.write( - "HTTP/1.0 414 Request URI Too Long\r\nConnection: close\r\nContent-length: 0\r\n\r\n") + "HTTP/1.0 414 Request URI Too Long\r\n" + "Connection: close\r\nContent-length: 0\r\n\r\n") self.close_connection = 1 return except greenio.SSL.ZeroReturnError: @@ -264,7 +266,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): if self.close_connection: towrite.append('Connection: close\r\n') elif send_keep_alive: - towrite.append('Connection: keep-alive\r\n') + towrite.append('Connection: keep-alive\r\n') towrite.append('\r\n') # end of header writing @@ -277,7 +279,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): _writelines(towrite) length[0] = length[0] + sum(map(len, towrite)) except UnicodeEncodeError: - print "Encountered unicode while attempting to write wsgi response: ", [x for x in towrite if isinstance(x, unicode)] + print "Encountered unicode while attempting to write wsgi response: ", \ + [x for x in towrite if isinstance(x, unicode)] traceback.print_exc() _writelines( ["HTTP/1.0 500 Internal Server Error\r\n", @@ -285,7 +288,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): "Content-type: text/plain\r\n", "Content-length: 98\r\n", "\r\n", - "Internal Server Error: wsgi application passed a unicode object to the server instead of a string."]) + ("Internal Server Error: wsgi application passed " + "a unicode object to the server instead of a string.")]) def start_response(status, response_headers, exc_info=None): status_code[0] = status.split()[0] @@ -298,7 +302,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): # Avoid dangling circular ref exc_info = None - capitalized_headers = [('-'.join([x.capitalize() for x in key.split('-')]), value) + capitalized_headers = [('-'.join([x.capitalize() + for x in key.split('-')]), value) for key, value in response_headers] headers_set[:] = [status, capitalized_headers] @@ -329,17 +334,19 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): write(''.join(towrite)) if not headers_sent or (use_chunked[0] and just_written_size): write('') - except Exception, e: + except Exception: self.close_connection = 1 exc = traceback.format_exc() print exc if not headers_set: - start_response("500 Internal Server Error", [('Content-type', 'text/plain')]) + start_response("500 Internal Server Error", + [('Content-type', 'text/plain')]) write(exc) finally: if hasattr(result, 'close'): result.close() - if self.environ['eventlet.input'].position < self.environ.get('CONTENT_LENGTH', 0): + if (self.environ['eventlet.input'].position + < self.environ.get('CONTENT_LENGTH', 0)): ## Read and discard body if there was no pending 100-continue if not self.environ['eventlet.input'].wfile: while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE): @@ -353,7 +360,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): status_code=status_code[0], body_length=length[0], wall_seconds=finish - start)) - + def get_client_ip(self): client_ip = self.client_address[0] if self.server.log_x_forwarded_for: @@ -422,19 +429,19 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): class Server(BaseHTTPServer.HTTPServer): - def __init__(self, - socket, - address, - app, - log=None, - environ=None, - max_http_version=None, - protocol=HttpProtocol, + def __init__(self, + socket, + address, + app, + log=None, + environ=None, + max_http_version=None, + protocol=HttpProtocol, minimum_chunk_size=None, log_x_forwarded_for=True, keepalive=True, log_format=DEFAULT_LOG_FORMAT): - + self.outstanding_requests = 0 self.socket = socket self.address = address @@ -454,7 +461,6 @@ class Server(BaseHTTPServer.HTTPServer): self.log_format = log_format def get_environ(self): - socket = self.socket d = { 'wsgi.errors': sys.stderr, 'wsgi.version': (1, 0), @@ -477,29 +483,29 @@ class Server(BaseHTTPServer.HTTPServer): try: import ssl ACCEPT_EXCEPTIONS = (socket.error, ssl.SSLError) - ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF, + ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF, ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_SSL)) except ImportError: ACCEPT_EXCEPTIONS = (socket.error,) ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF)) -def server(sock, site, - log=None, - environ=None, +def server(sock, site, + log=None, + environ=None, max_size=None, - max_http_version=DEFAULT_MAX_HTTP_VERSION, + max_http_version=DEFAULT_MAX_HTTP_VERSION, protocol=HttpProtocol, - server_event=None, + server_event=None, minimum_chunk_size=None, log_x_forwarded_for=True, custom_pool=None, keepalive=True, log_format=DEFAULT_LOG_FORMAT): - """ Start up a wsgi server handling requests from the supplied server + """ Start up a wsgi server handling requests from the supplied server socket. This function loops forever. The *sock* object will be closed after server exits, but the underlying file descriptor will remain open, so if you have a dup() of *sock*, it will remain usable. - + :param sock: Server socket, must be already bound to a port and listening. :param site: WSGI application function. :param log: File-like object that logs should be written to. If not specified, sys.stderr is used. @@ -514,11 +520,11 @@ def server(sock, site, :param keepalive: If set to False, disables keepalives on the server; all connections will be closed after serving one request. :param log_format: A python format string that is used as the template to generate log lines. The following values can be formatted into it: client_ip, date_time, request_line, status_code, body_length, wall_seconds. Look the default for an example of how to use this. """ - serv = Server(sock, sock.getsockname(), - site, log, - environ=None, - max_http_version=max_http_version, - protocol=protocol, + serv = Server(sock, sock.getsockname(), + site, log, + environ=None, + max_http_version=max_http_version, + protocol=protocol, minimum_chunk_size=minimum_chunk_size, log_x_forwarded_for=log_x_forwarded_for, keepalive=keepalive, @@ -543,12 +549,13 @@ def server(sock, site, if port == ':80': port = '' - serv.log.write("(%s) wsgi starting up on %s://%s%s/\n" % (os.getpid(), scheme, host, port)) + serv.log.write("(%s) wsgi starting up on %s://%s%s/\n" % ( + os.getpid(), scheme, host, port)) while True: try: client_socket = sock.accept() try: - pool.spawn_n(serv.process_request, client_socket) + pool.spawn_n(serv.process_request, client_socket) except AttributeError: warnings.warn("wsgi's pool should be an instance of " \ "eventlet.greenpool.GreenPool, is %s. Please convert your"\ @@ -572,4 +579,3 @@ def server(sock, site, except socket.error, e: if get_errno(e) not in BROKEN_SOCK: traceback.print_exc() - diff --git a/tests/db_pool_test.py b/tests/db_pool_test.py index e1ec284..94c4e12 100644 --- a/tests/db_pool_test.py +++ b/tests/db_pool_test.py @@ -1,11 +1,14 @@ "Test cases for db_pool" +import sys +import os +import traceback +from unittest import TestCase, main from tests import skipped, skip_unless, skip_with_pyevent -from unittest import TestCase, main from eventlet import event from eventlet import db_pool import eventlet -import os + class DBTester(object): __test__ = False # so that nose doesn't try to execute this directly @@ -14,19 +17,19 @@ class DBTester(object): self.connection = None connection = self._dbmodule.connect(**self._auth) cursor = connection.cursor() - cursor.execute("""CREATE TABLE gargleblatz + cursor.execute("""CREATE TABLE gargleblatz ( a INTEGER );""") connection.commit() cursor.close() - + def tearDown(self): if self.connection: self.connection.close() self.drop_db() - def set_up_dummy_table(self, connection = None): + def set_up_dummy_table(self, connection=None): close_connection = False if connection is None: close_connection = True @@ -53,7 +56,7 @@ class DBConnectionPool(DBTester): super(DBConnectionPool, self).setUp() self.pool = self.create_pool() self.connection = self.pool.get() - + def tearDown(self): if self.connection: self.pool.put(self.connection) @@ -84,7 +87,7 @@ class DBConnectionPool(DBTester): self.assert_(False) except AssertionError: raise - except Exception, e: + except Exception: pass cursor.close() @@ -107,7 +110,7 @@ class DBConnectionPool(DBTester): @skipped def test_deletion_does_a_put(self): - # doing a put on del causes some issues if __del__ is called in the + # doing a put on del causes some issues if __del__ is called in the # main coroutine, so, not doing that for now self.assert_(self.pool.free() == 0) self.connection = None @@ -144,7 +147,6 @@ class DBConnectionPool(DBTester): curs.execute(SHORT_QUERY) results.append(2) evt.send() - evt2 = event.Event() eventlet.spawn(a_query) results.append(1) self.assertEqual([1], results) @@ -201,10 +203,10 @@ class DBConnectionPool(DBTester): curs.execute("delete from gargleblatz where a=314159") conn.commit() self.pool.put(conn) - + @skipped def test_two_simultaneous_connections(self): - # timing-sensitive test, disabled until we come up with a better + # timing-sensitive test, disabled until we come up with a better # way to do this self.pool = self.create_pool(2) conn = self.pool.get() @@ -238,36 +240,36 @@ class DBConnectionPool(DBTester): evt2.wait() results.sort() self.assertEqual([1, 2], results) - + def test_clear(self): self.pool = self.create_pool() self.pool.put(self.connection) self.pool.clear() self.assertEqual(len(self.pool.free_items), 0) - + def test_unwrap_connection(self): self.assert_(isinstance(self.connection, db_pool.GenericConnectionWrapper)) conn = self.pool._unwrap_connection(self.connection) self.assert_(not isinstance(conn, db_pool.GenericConnectionWrapper)) - + self.assertEquals(None, self.pool._unwrap_connection(None)) self.assertEquals(None, self.pool._unwrap_connection(1)) - - # testing duck typing here -- as long as the connection has a + + # testing duck typing here -- as long as the connection has a # _base attribute, it should be unwrappable x = Mock() x._base = 'hi' self.assertEquals('hi', self.pool._unwrap_connection(x)) conn.close() - + def test_safe_close(self): self.pool._safe_close(self.connection, quiet=True) self.assertEquals(len(self.pool.free_items), 1) - + self.pool._safe_close(None) self.pool._safe_close(1) - + # now we're really going for 100% coverage x = Mock() def fail(): @@ -280,7 +282,7 @@ class DBConnectionPool(DBTester): raise RuntimeError("if this line has been printed, the test succeeded") x.close = fail2 self.pool._safe_close(x, quiet=False) - + def test_zero_max_idle(self): self.pool.put(self.connection) self.pool.clear() @@ -296,10 +298,13 @@ class DBConnectionPool(DBTester): self.connection = self.pool.get() self.connection.close() self.assertEquals(len(self.pool.free_items), 0) - + @skipped def test_max_idle(self): - # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + self.pool = self.create_pool(max_size=2, max_idle=0.02) self.connection = self.pool.get() self.connection.close() @@ -319,7 +324,10 @@ class DBConnectionPool(DBTester): @skipped def test_max_idle_many(self): - # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + self.pool = self.create_pool(max_size=2, max_idle=0.02) self.connection, conn2 = self.pool.get(), self.pool.get() self.connection.close() @@ -332,7 +340,10 @@ class DBConnectionPool(DBTester): @skipped def test_max_age(self): - # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + self.pool = self.create_pool(max_size=2, max_age=0.05) self.connection = self.pool.get() self.connection.close() @@ -347,7 +358,10 @@ class DBConnectionPool(DBTester): @skipped def test_max_age_many(self): - # This test is timing-sensitive. Rename the function without the "dont" to run it, but beware that it could fail or take a while. + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + self.pool = self.create_pool(max_size=2, max_age=0.15) self.connection, conn2 = self.pool.get(), self.pool.get() self.connection.close() @@ -366,7 +380,7 @@ class DBConnectionPool(DBTester): self.pool.put(self.connection) self.pool.clear() self.pool = self.create_pool(max_size=1, max_age=0) - + self.connection = self.pool.get() self.assertEquals(self.pool.free(), 0) self.assertEquals(self.pool.waiting(), 0) @@ -397,7 +411,7 @@ class DBConnectionPool(DBTester): def bench(c): for i in xrange(iterations): c.execute('select 1') - + bench(c) # warm-up results = [] for i in xrange(3): @@ -405,7 +419,7 @@ class DBConnectionPool(DBTester): bench(c) end = time.time() results.append(end-start) - + print "\n%u iterations took an average of %f seconds, (%s) in %s\n" % ( iterations, sum(results)/len(results), results, type(self)) @@ -415,29 +429,30 @@ class DBConnectionPool(DBTester): self.pool = self.create_pool(max_size=1, module=RaisingDBModule()) self.assertRaises(RuntimeError, self.pool.get) self.assertEquals(self.pool.free(), 1) - + class RaisingDBModule(object): def connect(self, *args, **kw): raise RuntimeError() - + class TpoolConnectionPool(DBConnectionPool): __test__ = False # so that nose doesn't try to execute this directly - def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout=0.5, module=None): + def create_pool(self, max_size=1, max_idle=10, max_age=10, + connect_timeout=0.5, module=None): if module is None: module = self._dbmodule - return db_pool.TpooledConnectionPool(module, - min_size=0, max_size=max_size, + return db_pool.TpooledConnectionPool(module, + min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout = connect_timeout, **self._auth) - + @skip_with_pyevent def setUp(self): super(TpoolConnectionPool, self).setUp() - + def tearDown(self): super(TpoolConnectionPool, self).tearDown() from eventlet import tpool @@ -447,19 +462,21 @@ class TpoolConnectionPool(DBConnectionPool): class RawConnectionPool(DBConnectionPool): __test__ = False # so that nose doesn't try to execute this directly - def create_pool(self, max_size = 1, max_idle = 10, max_age = 10, connect_timeout= 0.5, module=None): + def create_pool(self, max_size=1, max_idle=10, max_age=10, + connect_timeout=0.5, module=None): if module is None: module = self._dbmodule return db_pool.RawConnectionPool(module, - min_size=0, max_size=max_size, + min_size=0, max_size=max_size, max_idle=max_idle, max_age=max_age, connect_timeout=connect_timeout, **self._auth) def get_auth(): - """Looks in the local directory and in the user's home directory for a file named ".test_dbauth", - which contains a json map of parameters to the connect function. + """Looks in the local directory and in the user's home directory + for a file named ".test_dbauth", which contains a json map of + parameters to the connect function. """ files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'), os.path.join(os.path.expanduser('~'), '.test_dbauth')] @@ -473,13 +490,14 @@ def get_auth(): return dict([(str(modname), dict([(str(k), str(v)) for k, v in connectargs.items()])) for modname, connectargs in auth_utf8.items()]) - except (IOError, ImportError), e: + except (IOError, ImportError): pass return {'MySQLdb':{'host': 'localhost','user': 'root','passwd': ''}, 'psycopg2':{'user':'test'}} def mysql_requirement(_f): + verbose = os.environ.get('eventlet_test_mysql_verbose') try: import MySQLdb try: @@ -487,26 +505,27 @@ def mysql_requirement(_f): MySQLdb.connect(**auth) return True except MySQLdb.OperationalError: - print "Skipping mysql tests, error when connecting" - import traceback - traceback.print_exc() + if verbose: + print >> sys.stderr, ">> Skipping mysql tests, error when connecting:" + traceback.print_exc() return False except ImportError: - print "Skipping mysql tests, MySQLdb not importable" + if verbose: + print >> sys.stderr, ">> Skipping mysql tests, MySQLdb not importable" return False class MysqlConnectionPool(object): - dummy_table_sql = """CREATE TEMPORARY TABLE test_table + dummy_table_sql = """CREATE TEMPORARY TABLE test_table ( row_id INTEGER PRIMARY KEY AUTO_INCREMENT, - value_int INTEGER, - value_float FLOAT, - value_string VARCHAR(200), - value_uuid CHAR(36), - value_binary BLOB, - value_binary_string VARCHAR(200) BINARY, - value_enum ENUM('Y','N'), - created TIMESTAMP + value_int INTEGER, + value_float FLOAT, + value_string VARCHAR(200), + value_uuid CHAR(36), + value_binary BLOB, + value_binary_string VARCHAR(200) BINARY, + value_enum ENUM('Y','N'), + created TIMESTAMP ) ENGINE=InnoDB;""" @skip_unless(mysql_requirement) @@ -515,7 +534,7 @@ class MysqlConnectionPool(object): self._dbmodule = MySQLdb self._auth = get_auth()['MySQLdb'] super(MysqlConnectionPool, self).setUp() - + def tearDown(self): super(MysqlConnectionPool, self).tearDown() @@ -561,16 +580,16 @@ def postgres_requirement(_f): class Psycopg2ConnectionPool(object): - dummy_table_sql = """CREATE TEMPORARY TABLE test_table + dummy_table_sql = """CREATE TEMPORARY TABLE test_table ( row_id SERIAL PRIMARY KEY, - value_int INTEGER, - value_float FLOAT, - value_string VARCHAR(200), - value_uuid CHAR(36), - value_binary BYTEA, + value_int INTEGER, + value_float FLOAT, + value_string VARCHAR(200), + value_uuid CHAR(36), + value_binary BYTEA, value_binary_string BYTEA, - created TIMESTAMP + created TIMESTAMP );""" @skip_unless(postgres_requirement) @@ -600,7 +619,6 @@ class Psycopg2ConnectionPool(object): def drop_db(self): auth = self._auth.copy() - dbname = auth.pop('database') conn = self._dbmodule.connect(**auth) conn.set_isolation_level(0) db = conn.cursor() diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 174bdb9..b93e1e4 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -13,7 +13,7 @@ import array def bufsized(sock, size=1): """ Resize both send and receive buffers on a socket. Useful for testing trampoline. Returns the socket. - + >>> import socket >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) """ @@ -40,7 +40,7 @@ class TestGreenIo(LimitedTestCase): self.assertEqual(e.args[0], 'timed out') except socket.error, e: # unreachable is also a valid outcome - if e[0] != errno.EHOSTUNREACH: + if not e[0] in (errno.EHOSTUNREACH, errno.ENETUNREACH): raise def test_accept_timeout(self): @@ -62,7 +62,8 @@ class TestGreenIo(LimitedTestCase): s.settimeout(0.1) gs = greenio.GreenSocket(s) e = gs.connect_ex(('192.0.2.1', 80)) - self.assertEquals(e, errno.EAGAIN) + if not e in (errno.EHOSTUNREACH, errno.ENETUNREACH): + self.assertEquals(e, errno.EAGAIN) def test_recv_timeout(self): listener = greenio.GreenSocket(socket.socket()) @@ -249,16 +250,16 @@ class TestGreenIo(LimitedTestCase): self.assertRaises(socket.error, conn.send, 'b') finally: listener.close() - + def did_it_work(server): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', server.getsockname()[1])) fd = client.makefile() client.close() - assert fd.readline() == 'hello\n' + assert fd.readline() == 'hello\n' assert fd.read() == '' fd.close() - + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.bind(('0.0.0.0', 0)) @@ -266,7 +267,7 @@ class TestGreenIo(LimitedTestCase): killer = eventlet.spawn(accept_close_early, server) did_it_work(server) killer.wait() - + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) server.bind(('0.0.0.0', 0)) @@ -274,7 +275,7 @@ class TestGreenIo(LimitedTestCase): killer = eventlet.spawn(accept_close_late, server) did_it_work(server) killer.wait() - + def test_del_closes_socket(self): def accept_once(listener): # delete/overwrite the original conn @@ -298,11 +299,11 @@ class TestGreenIo(LimitedTestCase): client.connect(('127.0.0.1', server.getsockname()[1])) fd = client.makefile() client.close() - assert fd.read() == 'hello\n' + assert fd.read() == 'hello\n' assert fd.read() == '' - + killer.wait() - + def test_full_duplex(self): large_data = '*' * 10 * min_buf_size() listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -313,7 +314,7 @@ class TestGreenIo(LimitedTestCase): def send_large(sock): sock.sendall(large_data) - + def read_large(sock): result = sock.recv(len(large_data)) expected = 'hello world' @@ -332,7 +333,7 @@ class TestGreenIo(LimitedTestCase): result += sock.recv(10) self.assertEquals(result, expected) send_large_coro.wait() - + server_evt = eventlet.spawn(server) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', listener.getsockname()[1])) @@ -343,7 +344,7 @@ class TestGreenIo(LimitedTestCase): server_evt.wait() large_evt.wait() client.close() - + def test_sendall(self): # test adapted from Marcus Cavanaugh's email # it may legitimately take a while, but will eventually complete @@ -356,7 +357,7 @@ class TestGreenIo(LimitedTestCase): sock = bufsized(sock, size=bufsize) sock.sendall('x'*many_bytes) sock.sendall('y'*second_bytes) - + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1) listener.bind(("", 0)) @@ -371,20 +372,20 @@ class TestGreenIo(LimitedTestCase): if data == '': break total += len(data) - + total2 = 0 while total < second_bytes: data = client.recv(second_bytes) if data == '': break total2 += len(data) - + sender_coro.wait() client.close() - + for bytes in (1000, 10000, 100000, 1000000): test_sendall_impl(bytes) - + def test_wrap_socket(self): try: import ssl @@ -396,7 +397,7 @@ class TestGreenIo(LimitedTestCase): sock.bind(('127.0.0.1', 0)) sock.listen(50) ssl_sock = ssl.wrap_socket(sock) - + def test_timeout_and_final_write(self): # This test verifies that a write on a socket that we've # stopped listening for doesn't result in an incorrect switch @@ -405,11 +406,11 @@ class TestGreenIo(LimitedTestCase): server.bind(('127.0.0.1', 0)) server.listen(50) bound_port = server.getsockname()[1] - + def sender(evt): s2, addr = server.accept() wrap_wfile = s2.makefile() - + eventlet.sleep(0.02) wrap_wfile.write('hi') s2.close() @@ -476,7 +477,7 @@ class TestGreenIoLong(LimitedTestCase): @skip_with_pyevent def test_multiple_readers(self): recvsize = 2 * min_buf_size() - sendsize = 10 * recvsize + sendsize = 10 * recvsize # test that we can have multiple coroutines reading # from the same fd. We make no guarantees about which one gets which # bytes, but they should both get at least some @@ -486,7 +487,7 @@ class TestGreenIoLong(LimitedTestCase): if data == '': break results.append(data) - + results1 = [] results2 = [] listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -516,7 +517,7 @@ class TestGreenIoLong(LimitedTestCase): listener.close() self.assert_(len(results1) > 0) self.assert_(len(results2) > 0) - - + + if __name__ == '__main__': main()