Time-based expiry for db_pool. This adds the ability to expire connections by idleness and also to cap them to a finite lifespan (which will come in handy for e.g. mysql, which holds some locks on a per-connection basis).

This commit is contained in:
rdw
2008-08-13 14:34:39 -07:00
parent 2f62137e89
commit bfa8cfad9a
2 changed files with 177 additions and 36 deletions

View File

@@ -63,29 +63,157 @@ connection pools keyed on host,databasename"""
new_kwargs['db'] = dbname
new_kwargs['host'] = host
new_kwargs.update(self.credentials_for(host))
dbpool = self._conn_pool_class(self._module, min_size=self._min_size, max_size=self._max_size,
*self._args, **new_kwargs)
dbpool = self._conn_pool_class(self._module,
min_size=self._min_size, max_size=self._max_size,
*self._args, **new_kwargs)
self._databases[key] = dbpool
return self._databases[key]
class BaseConnectionPool(Pool):
# *TODO: we need to expire and close connections if they've been
# idle for a while, so that system-wide connection count doesn't
# monotonically increase forever
def __init__(self, db_module, min_size = 0, max_size = 4, *args, **kwargs):
def __init__(self, db_module,
min_size = 0, max_size = 4,
max_idle = 10, max_age = 30,
*args, **kwargs):
"""
Constructs a pool with at least *min_size* connections and at most
*max_size* connections. Uses *db_module* to construct new connections.
The *max_idle* parameter determines how long pooled connections can
remain idle, in seconds. After *max_idle* seconds have elapsed
without the connection being used, the pool closes the connection.
*max_age* is how long any particular connection is allowed to live.
Connections that have been open for longer than *max_age* seconds are
closed, regardless of idle time. If *max_age* is 0, all connections are
closed on return to the pool, reducing it to a concurrency limiter.
The remainder of the arguments are used as parameters to the
*db_module*'s connection constructor.
"""
assert(db_module)
self._db_module = db_module
self._args = args
self._kwargs = kwargs
super(BaseConnectionPool, self).__init__(min_size, max_size)
self.max_idle = max_idle
self.max_age = max_age
self._expiration_timer = None
super(BaseConnectionPool, self).__init__(min_size=min_size,
max_size=max_size,
order_as_stack=True)
def _schedule_expiration(self):
""" Sets up a timer that will call _expire_old_connections when the
oldest connection currently in the free pool is ready to expire. This
is the earliest possible time that a connection could expire, thus, the
timer will be running as infrequently as possible without missing a
possible expiration.
If this function is called when a timer is already scheduled, it does
nothing.
If max_age or max_idle is 0, _schedule_expiration likewise does nothing.
"""
if self.max_age is 0 or self.max_idle is 0:
# expiration is unnecessary because all connections will be expired
# on put
return
if ( self._expiration_timer is not None
and not getattr(self._expiration_timer, 'called', False)
and not getattr(self._expiration_timer, 'cancelled', False) ):
# the next timer is already scheduled
return
try:
now = time.time()
self._expire_old_connections(now)
# the last item in the list, because of the stack ordering,
# is going to be the most-idle
idle_delay = (self.free_items[-1][0] - now) + self.max_idle
oldest = min([t[1] for t in self.free_items])
age_delay = (oldest - now) + self.max_age
next_delay = min(idle_delay, age_delay)
except IndexError, ValueError:
# no free items, unschedule ourselves
self._expiration_timer = None
return
if next_delay > 0:
# set up a continuous self-calling loop
self._expiration_timer = api.call_after(next_delay,
self._schedule_expiration)
def _expire_old_connections(self, now):
""" Iterates through the open connections contained in the pool, closing
ones that have remained idle for longer than max_idle seconds, or have
been in existence for longer than max_age seconds.
*now* is the current time, as returned by time.time().
"""
original_count = len(free_items)
self.free_items = deque([
(last_used, created_at, conn)
for last_used, created_at, conn in self.free_items
if not self._is_expired(now, last_used, created_at, conn)])
# adjust the current size counter to account for expired
# connections
self.current_size -= original_count - len(self.free_items)
def _is_expired(self, now, last_used, created_at, conn):
""" Returns true and closes the connection if it's expired."""
if ( self.max_idle <= 0
or self.max_age <= 0
or now - last_used > self.max_idle
or now - created_at > self.max_age ):
self._safe_close(conn, quiet=True)
return True
return False
def _unwrap_connection(self, conn):
""" If the connection was wrapped by a subclass of BaseConnectionWrapper
and is still functional (as determined by the __nonzero__ method),
returns the unwrapped connection. If anything goes wrong with this
process, returns None.
"""
base = None
try:
if conn:
base = conn._base
conn._destroy()
else:
base = None
except AttributeError:
pass
return base
def _safe_close(self, conn, quiet = False):
""" Closes the connection, squelching any exceptions. """
try:
conn.close()
except KeyboardInterrupt:
raise
except AttributeError:
pass # conn is None, or junk
except:
if not quiet:
print "Connection.close raised: %s" % (sys.exc_info()[1])
def get(self):
# wrap the connection for easier use
conn = super(BaseConnectionPool, self).get()
return PooledConnectionWrapper(conn, self)
# wrap the connection so the consumer can call close() safely
_last_used, created_at, conn = super(BaseConnectionPool, self).get()
wrapped = PooledConnectionWrapper(conn, self)
# annotating the wrapper so that when it gets put in the pool
# again, we'll know how old it is
wrapped._db_pool_created_at = created_at
return wrapped
def put(self, conn):
created_at = getattr(conn, '_db_pool_created_at', 0)
# rollback any uncommitted changes, so that the next client
# has a clean slate. This also pokes the connection to see if
# it's dead or None
@@ -93,38 +221,28 @@ class BaseConnectionPool(Pool):
conn.rollback()
except KeyboardInterrupt:
raise
except AttributeError, e:
# this means it's already been destroyed, so we don't need to print anything
conn = None
except:
# we don't care what the exception was, we just know the
# connection is dead
print "WARNING: connection.rollback raised: %s" % (sys.exc_info()[1])
conn = None
# unwrap the connection for storage
if isinstance(conn, GenericConnectionWrapper):
if conn:
base = conn._base
conn._destroy()
conn = base
else:
conn = None
if conn is not None:
super(BaseConnectionPool, self).put(conn)
base = self._unwrap_connection(conn)
now = time.time()
if (base is not None
and not self._expired(now, now, created_at, base)):
super(BaseConnectionPool, self).put( (now, created_at, base) )
else:
self.current_size -= 1
self._schedule_expiration()
def clear(self):
""" Close all connections that this pool still holds a reference to, leaving it empty."""
for conn in self.free_items:
try:
conn.close()
except KeyboardInterrupt:
raise
except:
pass # even if stuff happens here, we still want to at least try to close all the other connections
""" Close all connections that this pool still holds a reference to,
and removes all references to them.
"""
for _last_used, _created_at, conn in self.free_items:
self._safe_close(conn, quiet=True)
self.free_items.clear()
def __del__(self):
@@ -206,7 +324,7 @@ class GenericConnectionWrapper(object):
class PooledConnectionWrapper(GenericConnectionWrapper):
""" A connection wrapper where:
- the close method returns the connection to the pool instead of closing it directly
- you can do if conn:
- bool(conn) returns a reasonable value
- returns itself to the pool if it gets garbage collected
"""
def __init__(self, baseconn, pool):

View File

@@ -40,14 +40,34 @@ class Pool(object):
# do stuff
finally:
self.pool.put(thing)
The maximum size of the pool can be modified at runtime via the max_size attribute.
Adjusting this number does not affect existing items checked out of the pool, nor
on any waiters who are waiting for an item to free up. Some indeterminate number
of get/put cycles will be necessary before the new maximum size truly matches the
actual operation of the pool.
"""
def __init__(self, min_size=0, max_size=4):
def __init__(self, min_size=0, max_size=4, order_as_stack=False):
""" Pre-populates the pool with *min_size* items. Sets a hard limit to
the size of the pool -- it cannot contain any more items than
*max_size*, and if there are already *max_size* items 'checked out' of
the pool, the pool will cause any getter to cooperatively yield until an
item is put in.
*order_as_stack* governs the ordering of the items in the free pool. If
False (the default), the free items collection (of items that were
created and were put back in the pool) acts as a round-robin, giving
each item approximately equal utilization. If True, the free pool acts
as a FILO stack, which preferentially re-uses items that have most
recently been used.
"""
self.min_size = min_size
self.max_size = max_size
self.order_as_stack = order_as_stack
self.current_size = 0
self.channel = channel.channel()
self.free_items = collections.deque()
for x in range(min_size):
for x in xrange(min_size):
self.current_size += 1
self.free_items.append(self.create())
@@ -71,7 +91,10 @@ class Pool(object):
if self.channel.balance < 0:
self.channel.send(item)
else:
self.free_items.append(item)
if self.order_as_stack:
self.free_items.appendleft(item)
else:
self.free_items.append(item)
def resize(self, new_size):
"""Resize the pool