From bfa8cfad9ac927616685edc21c5839db2c89fcd3 Mon Sep 17 00:00:00 2001 From: rdw Date: Wed, 13 Aug 2008 14:34:39 -0700 Subject: [PATCH] Time-based expiry for db_pool. This adds the ability to expire connections by idleness and also to cap them to a finite lifespan (which will come in handy for e.g. mysql, which holds some locks on a per-connection basis). --- eventlet/db_pool.py | 184 ++++++++++++++++++++++++++++++++++++-------- eventlet/pools.py | 29 ++++++- 2 files changed, 177 insertions(+), 36 deletions(-) diff --git a/eventlet/db_pool.py b/eventlet/db_pool.py index aefe5f8..6ed9c35 100644 --- a/eventlet/db_pool.py +++ b/eventlet/db_pool.py @@ -63,29 +63,157 @@ connection pools keyed on host,databasename""" new_kwargs['db'] = dbname new_kwargs['host'] = host new_kwargs.update(self.credentials_for(host)) - dbpool = self._conn_pool_class(self._module, min_size=self._min_size, max_size=self._max_size, - *self._args, **new_kwargs) + dbpool = self._conn_pool_class(self._module, + min_size=self._min_size, max_size=self._max_size, + *self._args, **new_kwargs) self._databases[key] = dbpool return self._databases[key] + class BaseConnectionPool(Pool): - # *TODO: we need to expire and close connections if they've been - # idle for a while, so that system-wide connection count doesn't - # monotonically increase forever - def __init__(self, db_module, min_size = 0, max_size = 4, *args, **kwargs): + def __init__(self, db_module, + min_size = 0, max_size = 4, + max_idle = 10, max_age = 30, + *args, **kwargs): + """ + Constructs a pool with at least *min_size* connections and at most + *max_size* connections. Uses *db_module* to construct new connections. + + The *max_idle* parameter determines how long pooled connections can + remain idle, in seconds. After *max_idle* seconds have elapsed + without the connection being used, the pool closes the connection. + + *max_age* is how long any particular connection is allowed to live. + Connections that have been open for longer than *max_age* seconds are + closed, regardless of idle time. If *max_age* is 0, all connections are + closed on return to the pool, reducing it to a concurrency limiter. + + The remainder of the arguments are used as parameters to the + *db_module*'s connection constructor. + """ assert(db_module) self._db_module = db_module self._args = args self._kwargs = kwargs - super(BaseConnectionPool, self).__init__(min_size, max_size) + self.max_idle = max_idle + self.max_age = max_age + self._expiration_timer = None + super(BaseConnectionPool, self).__init__(min_size=min_size, + max_size=max_size, + order_as_stack=True) + + def _schedule_expiration(self): + """ Sets up a timer that will call _expire_old_connections when the + oldest connection currently in the free pool is ready to expire. This + is the earliest possible time that a connection could expire, thus, the + timer will be running as infrequently as possible without missing a + possible expiration. + + If this function is called when a timer is already scheduled, it does + nothing. + + If max_age or max_idle is 0, _schedule_expiration likewise does nothing. + """ + if self.max_age is 0 or self.max_idle is 0: + # expiration is unnecessary because all connections will be expired + # on put + return + + if ( self._expiration_timer is not None + and not getattr(self._expiration_timer, 'called', False) + and not getattr(self._expiration_timer, 'cancelled', False) ): + # the next timer is already scheduled + return + + try: + now = time.time() + self._expire_old_connections(now) + # the last item in the list, because of the stack ordering, + # is going to be the most-idle + idle_delay = (self.free_items[-1][0] - now) + self.max_idle + oldest = min([t[1] for t in self.free_items]) + age_delay = (oldest - now) + self.max_age + + next_delay = min(idle_delay, age_delay) + except IndexError, ValueError: + # no free items, unschedule ourselves + self._expiration_timer = None + return + + if next_delay > 0: + # set up a continuous self-calling loop + self._expiration_timer = api.call_after(next_delay, + self._schedule_expiration) + + def _expire_old_connections(self, now): + """ Iterates through the open connections contained in the pool, closing + ones that have remained idle for longer than max_idle seconds, or have + been in existence for longer than max_age seconds. + + *now* is the current time, as returned by time.time(). + """ + original_count = len(free_items) + self.free_items = deque([ + (last_used, created_at, conn) + for last_used, created_at, conn in self.free_items + if not self._is_expired(now, last_used, created_at, conn)]) + + # adjust the current size counter to account for expired + # connections + self.current_size -= original_count - len(self.free_items) + + def _is_expired(self, now, last_used, created_at, conn): + """ Returns true and closes the connection if it's expired.""" + if ( self.max_idle <= 0 + or self.max_age <= 0 + or now - last_used > self.max_idle + or now - created_at > self.max_age ): + self._safe_close(conn, quiet=True) + return True + return False + + def _unwrap_connection(self, conn): + """ If the connection was wrapped by a subclass of BaseConnectionWrapper + and is still functional (as determined by the __nonzero__ method), + returns the unwrapped connection. If anything goes wrong with this + process, returns None. + """ + base = None + try: + if conn: + base = conn._base + conn._destroy() + else: + base = None + except AttributeError: + pass + return base + + def _safe_close(self, conn, quiet = False): + """ Closes the connection, squelching any exceptions. """ + try: + conn.close() + except KeyboardInterrupt: + raise + except AttributeError: + pass # conn is None, or junk + except: + if not quiet: + print "Connection.close raised: %s" % (sys.exc_info()[1]) def get(self): - # wrap the connection for easier use - conn = super(BaseConnectionPool, self).get() - return PooledConnectionWrapper(conn, self) + # wrap the connection so the consumer can call close() safely + _last_used, created_at, conn = super(BaseConnectionPool, self).get() + wrapped = PooledConnectionWrapper(conn, self) + # annotating the wrapper so that when it gets put in the pool + # again, we'll know how old it is + wrapped._db_pool_created_at = created_at + return wrapped def put(self, conn): + created_at = getattr(conn, '_db_pool_created_at', 0) + # rollback any uncommitted changes, so that the next client # has a clean slate. This also pokes the connection to see if # it's dead or None @@ -93,38 +221,28 @@ class BaseConnectionPool(Pool): conn.rollback() except KeyboardInterrupt: raise - except AttributeError, e: - # this means it's already been destroyed, so we don't need to print anything - conn = None except: # we don't care what the exception was, we just know the # connection is dead print "WARNING: connection.rollback raised: %s" % (sys.exc_info()[1]) conn = None - - # unwrap the connection for storage - if isinstance(conn, GenericConnectionWrapper): - if conn: - base = conn._base - conn._destroy() - conn = base - else: - conn = None - if conn is not None: - super(BaseConnectionPool, self).put(conn) + base = self._unwrap_connection(conn) + now = time.time() + if (base is not None + and not self._expired(now, now, created_at, base)): + super(BaseConnectionPool, self).put( (now, created_at, base) ) else: self.current_size -= 1 + + self._schedule_expiration() def clear(self): - """ Close all connections that this pool still holds a reference to, leaving it empty.""" - for conn in self.free_items: - try: - conn.close() - except KeyboardInterrupt: - raise - except: - pass # even if stuff happens here, we still want to at least try to close all the other connections + """ Close all connections that this pool still holds a reference to, + and removes all references to them. + """ + for _last_used, _created_at, conn in self.free_items: + self._safe_close(conn, quiet=True) self.free_items.clear() def __del__(self): @@ -206,7 +324,7 @@ class GenericConnectionWrapper(object): class PooledConnectionWrapper(GenericConnectionWrapper): """ A connection wrapper where: - the close method returns the connection to the pool instead of closing it directly - - you can do if conn: + - bool(conn) returns a reasonable value - returns itself to the pool if it gets garbage collected """ def __init__(self, baseconn, pool): diff --git a/eventlet/pools.py b/eventlet/pools.py index a1af247..b11b01b 100644 --- a/eventlet/pools.py +++ b/eventlet/pools.py @@ -40,14 +40,34 @@ class Pool(object): # do stuff finally: self.pool.put(thing) + + The maximum size of the pool can be modified at runtime via the max_size attribute. + Adjusting this number does not affect existing items checked out of the pool, nor + on any waiters who are waiting for an item to free up. Some indeterminate number + of get/put cycles will be necessary before the new maximum size truly matches the + actual operation of the pool. """ - def __init__(self, min_size=0, max_size=4): + def __init__(self, min_size=0, max_size=4, order_as_stack=False): + """ Pre-populates the pool with *min_size* items. Sets a hard limit to + the size of the pool -- it cannot contain any more items than + *max_size*, and if there are already *max_size* items 'checked out' of + the pool, the pool will cause any getter to cooperatively yield until an + item is put in. + + *order_as_stack* governs the ordering of the items in the free pool. If + False (the default), the free items collection (of items that were + created and were put back in the pool) acts as a round-robin, giving + each item approximately equal utilization. If True, the free pool acts + as a FILO stack, which preferentially re-uses items that have most + recently been used. + """ self.min_size = min_size self.max_size = max_size + self.order_as_stack = order_as_stack self.current_size = 0 self.channel = channel.channel() self.free_items = collections.deque() - for x in range(min_size): + for x in xrange(min_size): self.current_size += 1 self.free_items.append(self.create()) @@ -71,7 +91,10 @@ class Pool(object): if self.channel.balance < 0: self.channel.send(item) else: - self.free_items.append(item) + if self.order_as_stack: + self.free_items.appendleft(item) + else: + self.free_items.append(item) def resize(self, new_size): """Resize the pool