diff --git a/eventlet/db_pool.py b/eventlet/db_pool.py index cdb29db..337d338 100644 --- a/eventlet/db_pool.py +++ b/eventlet/db_pool.py @@ -77,11 +77,11 @@ The constructor arguments: >>> dc = DatabaseConnector(MySQLdb, {'db.internal.example.com': {'user':'internal', 'passwd':'s33kr1t'}, - 'localhost': + 'localhost': {'user':'root', 'passwd':''}) - + If the credentials contain a host named 'default', then the value for 'default' is used whenever trying to connect to a host that has no explicit entry in the database. This is useful if there is some pool of hosts that share arguments. - + * conn_pool : The connection pool class to use. Defaults to db_pool.ConnectionPool. The rest of the arguments to the DatabaseConnector constructor are passed on to the ConnectionPool. @@ -103,28 +103,28 @@ class ConnectTimeout(Exception): class BaseConnectionPool(Pool): - def __init__(self, db_module, - min_size = 0, max_size = 4, + def __init__(self, db_module, + min_size = 0, max_size = 4, max_idle = 10, max_age = 30, connect_timeout = 5, *args, **kwargs): """ Constructs a pool with at least *min_size* connections and at most *max_size* connections. Uses *db_module* to construct new connections. - + The *max_idle* parameter determines how long pooled connections can - remain idle, in seconds. After *max_idle* seconds have elapsed - without the connection being used, the pool closes the connection. - + remain idle, in seconds. After *max_idle* seconds have elapsed + without the connection being used, the pool closes the connection. + *max_age* is how long any particular connection is allowed to live. Connections that have been open for longer than *max_age* seconds are - closed, regardless of idle time. If *max_age* is 0, all connections are + closed, regardless of idle time. If *max_age* is 0, all connections are closed on return to the pool, reducing it to a concurrency limiter. - - *connect_timeout* is the duration in seconds that the pool will wait + + *connect_timeout* is the duration in seconds that the pool will wait before timing out on connect() to the database. If triggered, the timeout will raise a ConnectTimeout from get(). - + The remainder of the arguments are used as parameters to the *db_module*'s connection constructor. """ @@ -136,33 +136,33 @@ class BaseConnectionPool(Pool): self.max_age = max_age self.connect_timeout = connect_timeout self._expiration_timer = None - super(BaseConnectionPool, self).__init__(min_size=min_size, + super(BaseConnectionPool, self).__init__(min_size=min_size, max_size=max_size, order_as_stack=True) - + def _schedule_expiration(self): - """ Sets up a timer that will call _expire_old_connections when the + """ Sets up a timer that will call _expire_old_connections when the oldest connection currently in the free pool is ready to expire. This is the earliest possible time that a connection could expire, thus, the - timer will be running as infrequently as possible without missing a + timer will be running as infrequently as possible without missing a possible expiration. - - If this function is called when a timer is already scheduled, it does + + If this function is called when a timer is already scheduled, it does nothing. - + If max_age or max_idle is 0, _schedule_expiration likewise does nothing. """ if self.max_age is 0 or self.max_idle is 0: # expiration is unnecessary because all connections will be expired # on put return - - if ( self._expiration_timer is not None + + if ( self._expiration_timer is not None and not getattr(self._expiration_timer, 'called', False) and not getattr(self._expiration_timer, 'cancelled', False) ): # the next timer is already scheduled - return - + return + try: now = time.time() self._expire_old_connections(now) @@ -171,23 +171,23 @@ class BaseConnectionPool(Pool): idle_delay = (self.free_items[-1][0] - now) + self.max_idle oldest = min([t[1] for t in self.free_items]) age_delay = (oldest - now) + self.max_age - + next_delay = min(idle_delay, age_delay) except IndexError, ValueError: # no free items, unschedule ourselves self._expiration_timer = None return - + if next_delay > 0: # set up a continuous self-calling loop self._expiration_timer = api.call_after(next_delay, self._schedule_expiration) - + def _expire_old_connections(self, now): """ Iterates through the open connections contained in the pool, closing ones that have remained idle for longer than max_idle seconds, or have been in existence for longer than max_age seconds. - + *now* is the current time, as returned by time.time(). """ original_count = len(self.free_items) @@ -204,8 +204,8 @@ class BaseConnectionPool(Pool): if not self._is_expired(now, last_used, created_at)] self.free_items.clear() self.free_items.extend(new_free) - - # adjust the current size counter to account for expired + + # adjust the current size counter to account for expired # connections self.current_size -= original_count - len(self.free_items) @@ -217,7 +217,7 @@ class BaseConnectionPool(Pool): or now - created_at > self.max_age ): return True return False - + def _unwrap_connection(self, conn): """ If the connection was wrapped by a subclass of BaseConnectionWrapper and is still functional (as determined @@ -250,7 +250,7 @@ class BaseConnectionPool(Pool): def get(self): conn = super(BaseConnectionPool, self).get() - + # None is a flag value that means that put got called with # something it couldn't use if conn is None: @@ -270,7 +270,7 @@ class BaseConnectionPool(Pool): _last_used, created_at, conn = conn else: created_at = time.time() - + # wrap the connection so the consumer can call close() safely wrapped = PooledConnectionWrapper(conn, self) # annotating the wrapper so that when it gets put in the pool @@ -282,7 +282,7 @@ class BaseConnectionPool(Pool): created_at = getattr(conn, '_db_pool_created_at', 0) now = time.time() conn = self._unwrap_connection(conn) - + if self._is_expired(now, now, created_at): self._safe_close(conn, quiet=False) conn = None @@ -314,7 +314,7 @@ class BaseConnectionPool(Pool): self._schedule_expiration() def clear(self): - """ Close all connections that this pool still holds a reference to, + """ Close all connections that this pool still holds a reference to, and removes all references to them. """ if self._expiration_timer: @@ -322,10 +322,10 @@ class BaseConnectionPool(Pool): free_items, self.free_items = self.free_items, deque() for _last_used, _created_at, conn in free_items: self._safe_close(conn, quiet=True) - + def __del__(self): self.clear() - + class SaranwrappedConnectionPool(BaseConnectionPool): """A pool which gives out saranwrapped database connections. @@ -343,9 +343,9 @@ class SaranwrappedConnectionPool(BaseConnectionPool): return saranwrap.wrap(db_module).connect(*args, **kw) finally: timeout.cancel() - + connect = classmethod(connect) - + class TpooledConnectionPool(BaseConnectionPool): """A pool which gives out tpool.Proxy-based database connections. @@ -462,7 +462,7 @@ class PooledConnectionWrapper(GenericConnectionWrapper): if self and self._pool: self._pool.put(self) self._destroy() - + def __del__(self): self.close() @@ -471,7 +471,7 @@ class DatabaseConnector(object): """\ @brief This is an object which will maintain a collection of database connection pools on a per-host basis.""" - def __init__(self, module, credentials, + def __init__(self, module, credentials, conn_pool=None, *args, **kwargs): """\ @brief constructor diff --git a/eventlet/exc.py b/eventlet/exc.py index 93673c0..1f148ce 100644 --- a/eventlet/exc.py +++ b/eventlet/exc.py @@ -39,7 +39,7 @@ def format_exc(exc=None): frames = [] while exc_tb is not None: f = exc_tb.tb_frame - + frames.append(( f.f_code.co_name, f.f_code.co_filename, diff --git a/eventlet/hubs/libev.py b/eventlet/hubs/libev.py index 090567a..54af53f 100644 --- a/eventlet/hubs/libev.py +++ b/eventlet/hubs/libev.py @@ -71,7 +71,7 @@ class Hub(hub.BaseHub): def signal_received(self, signal): # can't do more than set this flag here because the pyevent callback - # mechanism swallows exceptions raised here, so we have to raise in + # mechanism swallows exceptions raised here, so we have to raise in # the 'main' greenlet (in wait()) to kill the program self.interrupted = True self._evloop.unloop() @@ -97,7 +97,7 @@ class Hub(hub.BaseHub): # raise any signals that deserve raising if self.interrupted: self.interrupted = False - raise KeyboardInterrupt() + raise KeyboardInterrupt() def add_timer(self, timer): # store the pyevent timer object so that we can cancel later diff --git a/eventlet/hubs/nginx.py b/eventlet/hubs/nginx.py index 636047a..d0fc9ce 100644 --- a/eventlet/hubs/nginx.py +++ b/eventlet/hubs/nginx.py @@ -116,7 +116,7 @@ class Hub(hub.BaseHub): response = StartResponse() result = slave.switch( api.getcurrent(), env, response) - + while True: self.current_application = api.getcurrent() print "RESULT", result, callable(result[0]) diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index 7d18bd1..f37bd03 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -55,7 +55,7 @@ class Hub(hub.BaseHub): mask |= WRITE_MASK return mask - + def remove_descriptor(self, fileno): super(Hub, self).remove_descriptor(fileno) try: diff --git a/eventlet/saranwrap.py b/eventlet/saranwrap.py index baa9ff4..77fe64b 100644 --- a/eventlet/saranwrap.py +++ b/eventlet/saranwrap.py @@ -275,7 +275,7 @@ class ChildProcess(object): retval = _read_response(_id, attribute, self._in, self) finally: self._lock.put(t) - + return retval def __del__(self): @@ -323,7 +323,7 @@ not supported, so you have to know what has been exported. _dead_list.remove(dead_object) except KeyError: pass - + # Pass all public attributes across to find out if it is # callable or a simple attribute. request = Request('getattr', {'id':my_id, 'attribute':attribute}) @@ -332,7 +332,7 @@ not supported, so you have to know what has been exported. def __setattr__(self, attribute, value): #_prnt("Proxy::__setattr__: %s" % attribute) if _is_local(attribute): - # It must be local to this actual object, so we have to apply + # It must be local to this actual object, so we have to apply # it to the dict in a roundabout way attribute = _unmunge_attr_name(attribute) super(Proxy, self).__getattribute__('__dict__')[attribute]=value @@ -370,7 +370,7 @@ not need to deal with this class directly.""" my_id = self.__local_dict['_id'] request = Request('getitem', {'id':my_id, 'key':key}) return my_cp.make_request(request, attribute=key) - + def __setitem__(self, key, value): my_cp = self.__local_dict['_cp'] my_id = self.__local_dict['_id'] @@ -422,7 +422,7 @@ not need to deal with this class directly.""" # since the remote object is being serialized whole anyway, # there's no semantic difference between copy and deepcopy __copy__ = __deepcopy__ - + def proxied_type(self): """ Returns the type of the object in the child process. @@ -432,7 +432,7 @@ def proxied_type(self): 'real' type value.""" if type(self) is not ObjectProxy: return type(self) - + my_cp = self.__local_dict['_cp'] my_id = self.__local_dict['_id'] request = Request('type', {'id':my_id}) @@ -499,7 +499,7 @@ when the id is None.""" else: raise e #_log('getattr: %s' % str(response)) - + def handle_setattr(self, obj, req): try: return setattr(obj, req['attribute'], req['value']) @@ -534,7 +534,7 @@ when the id is None.""" fn = obj[req['name']] else: raise e - + return fn(*req['args'],**req['kwargs']) def handle_del(self, obj, req): diff --git a/eventlet/support/pylib.py b/eventlet/support/pylib.py index 41107ab..b9c2fa5 100644 --- a/eventlet/support/pylib.py +++ b/eventlet/support/pylib.py @@ -22,11 +22,9 @@ from py.magic import greenlet - import sys import types - def emulate(): module = types.ModuleType('greenlet') sys.modules['greenlet'] = module @@ -34,5 +32,3 @@ def emulate(): module.getcurrent = greenlet.getcurrent module.GreenletExit = greenlet.GreenletExit - - diff --git a/eventlet/support/stacklesspypys.py b/eventlet/support/stacklesspypys.py index cb3041f..1f79257 100644 --- a/eventlet/support/stacklesspypys.py +++ b/eventlet/support/stacklesspypys.py @@ -21,11 +21,9 @@ from stackless import greenlet - import sys import types - def emulate(): module = types.ModuleType('greenlet') sys.modules['greenlet'] = module @@ -33,8 +31,3 @@ def emulate(): module.getcurrent = greenlet.getcurrent module.GreenletExit = greenlet.GreenletExit - - - - - diff --git a/eventlet/support/stacklesss.py b/eventlet/support/stacklesss.py index 278a884..0a86d56 100644 --- a/eventlet/support/stacklesss.py +++ b/eventlet/support/stacklesss.py @@ -30,11 +30,7 @@ import traceback caller = None - - coro_args = {} - - tasklet_to_greenlet = {} diff --git a/eventlet/timer.py b/eventlet/timer.py index ce881ea..8b71cd3 100644 --- a/eventlet/timer.py +++ b/eventlet/timer.py @@ -93,7 +93,7 @@ class Timer(object): pass class LocalTimer(Timer): - + def __init__(self, *args, **kwargs): self.greenlet = getcurrent() Timer.__init__(self, *args, **kwargs) diff --git a/eventlet/util.py b/eventlet/util.py index f26885f..694ffe1 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -234,7 +234,7 @@ def wrap_threading_local_with_coro_local(): class local(object): def __init__(self): self.__dict__['__objs'] = {} - + def __getattr__(self, attr, g=get_ident): try: return self.__dict__['__objs'][g()][attr] @@ -242,10 +242,10 @@ def wrap_threading_local_with_coro_local(): raise AttributeError( "No variable %s defined for the thread %s" % (attr, g())) - + def __setattr__(self, attr, value, g=get_ident): self.__dict__['__objs'].setdefault(g(), {})[attr] = value - + def __delattr__(self, attr, g=get_ident): try: del self.__dict__['__objs'][g()][attr] diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index 85b6b33..0992ae6 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -124,7 +124,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): if e[0] != errno.EBADF: raise self.raw_requestline = '' - + if not self.raw_requestline: self.close_connection = 1 return @@ -254,7 +254,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): start_response("500 Internal Server Error", [('Content-type', 'text/plain')]) write(exc) return - + if towrite: write(''.join(towrite)) if not headers_sent: @@ -346,7 +346,7 @@ class Server(BaseHTTPServer.HTTPServer): self.log = sys.stderr self.app = app self.environ = environ - self.max_http_version = max_http_version + self.max_http_version = max_http_version self.protocol = protocol self.pid = os.getpid() if minimum_chunk_size is not None: diff --git a/greentest/api_test.py b/greentest/api_test.py index 4c60dbe..5304a3f 100644 --- a/greentest/api_test.py +++ b/greentest/api_test.py @@ -47,10 +47,10 @@ def check_hub(): class TestApi(tests.TestCase): mode = 'static' - + certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') - + def test_tcp_listener(self): socket = api.tcp_listener(('0.0.0.0', 0)) assert socket.getsockname()[0] == '0.0.0.0' @@ -83,27 +83,27 @@ class TestApi(tests.TestCase): check_hub() def test_connect_ssl(self): - def accept_once(listenfd): - try: + def accept_once(listenfd): + try: conn, addr = listenfd.accept() fl = conn.makeGreenFile('w') fl.write('hello\r\n') fl.close() - conn.close() - finally: - listenfd.close() - - server = api.ssl_listener(('0.0.0.0', 0), - self.certificate_file, - self.private_key_file) - api.spawn(accept_once, server) - - client = util.wrap_ssl( + conn.close() + finally: + listenfd.close() + + server = api.ssl_listener(('0.0.0.0', 0), + self.certificate_file, + self.private_key_file) + api.spawn(accept_once, server) + + client = util.wrap_ssl( api.connect_tcp(('127.0.0.1', server.getsockname()[1]))) client = client.makeGreenFile() - assert client.readline() == 'hello\r\n' - assert client.read() == '' + assert client.readline() == 'hello\r\n' + assert client.read() == '' client.close() def test_server(self): diff --git a/greentest/coros_test.py b/greentest/coros_test.py index 46151b3..e83510b 100644 --- a/greentest/coros_test.py +++ b/greentest/coros_test.py @@ -22,7 +22,6 @@ from greentest import tests from eventlet import coros, api - class TestEvent(tests.TestCase): mode = 'static' def setUp(self): diff --git a/greentest/httpd_test.py b/greentest/httpd_test.py index 696ba93..b51debe 100644 --- a/greentest/httpd_test.py +++ b/greentest/httpd_test.py @@ -100,7 +100,7 @@ class TestHttpd(tests.TestCase): def test_001_server(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) - + fd = sock.makeGreenFile() fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n') result = fd.read() @@ -112,7 +112,7 @@ class TestHttpd(tests.TestCase): def test_002_keepalive(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) - + fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') read_http(sock) @@ -124,7 +124,7 @@ class TestHttpd(tests.TestCase): # This should go in greenio_test sock = api.connect_tcp( ('127.0.0.1', 12346)) - + fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') cancel = api.exc_after(1, RuntimeError) @@ -166,7 +166,7 @@ class TestHttpd(tests.TestCase): status = result.split(' ')[1] self.assertEqual(status, '414') fd.close() - + def test_007_get_arg(self): # define a new handler that does a get_arg as well as a read_body def new_handle_request(req): @@ -174,28 +174,28 @@ class TestHttpd(tests.TestCase): body = req.read_body() req.write('a is %s, body is %s' % (a, body)) self.site.handle_request = new_handle_request - + sock = api.connect_tcp( ('127.0.0.1', 12346)) request = '\r\n'.join(( - 'POST /%s HTTP/1.0', - 'Host: localhost', - 'Content-Length: 3', + 'POST /%s HTTP/1.0', + 'Host: localhost', + 'Content-Length: 3', '', 'a=a')) fd = sock.makeGreenFile() fd.write(request) - + # send some junk after the actual request fd.write('01234567890123456789') reqline, headers, body = read_http(sock) self.assertEqual(body, 'a is a, body is a=a') fd.close() - + def test_008_correctresponse(self): sock = api.connect_tcp( ('127.0.0.1', 12346)) - + fd = sock.makeGreenFile() fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') response_line_200,_,_ = read_http(sock) diff --git a/greentest/tests.py b/greentest/tests.py index c13ef9f..19d93d9 100644 --- a/greentest/tests.py +++ b/greentest/tests.py @@ -41,24 +41,24 @@ def find_command(command): if os.access(p, os.X_OK): return p raise IOError(errno.ENOENT, 'Command not found: %r' % command) - + def run_all_tests(test_files = doc_test_files): - """ Runs all the unit tests, returning immediately after the + """ Runs all the unit tests, returning immediately after the first failed test. - + Returns true if the tests all succeeded. This method is really much longer than it ought to be. """ eventlet_dir = os.path.realpath(os.path.dirname(__file__)) if eventlet_dir not in sys.path: sys.path.append(eventlet_dir) - + # add all _test files as a policy import glob - test_files += [os.path.splitext(os.path.basename(x))[0] + test_files += [os.path.splitext(os.path.basename(x))[0] for x in glob.glob(os.path.join(eventlet_dir, "*_test.py"))] test_files.sort() - + for test_file in test_files: print "-=", test_file, "=-" try: @@ -66,21 +66,21 @@ def run_all_tests(test_files = doc_test_files): except ImportError: print "Unable to import %s, skipping" % test_file continue - + if test_file.endswith('_test'): # gawd, unittest, why you make it so difficult to just run some tests! suite = unittest.findTestCases(test_module) result = unittest.TextTestRunner().run(suite) if not result.wasSuccessful(): return False - else: + else: failures, tests = doctest.testmod(test_module) if failures: return False else: print "OK" - + return True - + if __name__ == '__main__': run_all_tests()