Introduce TTL for idle connections

We can reduce a workload of rabbitmq through implementation
of expiration mechanism for idle connections in the pool with
next properties:

 conn_pool_ttl (default 20 min)
 conn_pool_min_size: the pool size limit for expire() (default 2)

The problem is timeless idle connections in the pool, which can be created
via some single huge workload of RPCServer. One SEND connection is heartbeat
thread + some network activity every n second. So, we can reduce it.

Here is two ways to implement an expiration:

 [1] Create a separated thread for checking expire date of connections
 [2] Make call expire() on pool.get() or pool.put()

The [1] has some threading overhead, but probably insignificant
because the thread can sleep 99% time and wake up every 20 mins (by default).
Anyway current implementation is [2].

Change-Id: Ie8781d10549a044656824ceb78b2fe2e4f7f8b43
Kirill Bespalov 7 years ago
parent afd5f8233c
commit 162f6e987b

@ -25,10 +25,13 @@ import six
from oslo_messaging import exceptions
base_opts = [
cfg.IntOpt('rpc_conn_pool_size', default=30,
help='Size of RPC connection pool.'),
cfg.IntOpt('conn_pool_min_size', default=2,
help='The pool size limit for connections expiration policy'),
cfg.IntOpt('conn_pool_ttl', default=1200,
help='The time-to-live in sec of idle connections in the pool')

@ -50,6 +50,12 @@ kafka_opts = [
cfg.IntOpt('pool_size', default=10,
help='Pool Size for Kafka Consumers'),
cfg.IntOpt('conn_pool_min_size', default=2,
help='The pool size limit for connections expiration policy'),
cfg.IntOpt('conn_pool_ttl', default=1200,
help='The time-to-live in sec of idle connections in the pool')
@ -301,8 +307,13 @@ class KafkaDriver(base.BaseDriver):
super(KafkaDriver, self).__init__(
conf, url, default_exchange, allowed_remote_exmods)
# the pool configuration properties
max_size = self.conf.oslo_messaging_kafka.pool_size
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl
self.connection_pool = driver_pool.ConnectionPool(
self.conf, self.conf.oslo_messaging_kafka.pool_size,
self.conf, max_size, min_size, ttl,
self._url, Connection)
self.listeners = []

@ -1321,8 +1321,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
self.prefetch_size = (
# the pool configuration properties
max_size = conf.oslo_messaging_rabbit.rpc_conn_pool_size
min_size = conf.oslo_messaging_rabbit.conn_pool_min_size
ttl = conf.oslo_messaging_rabbit.conn_pool_ttl
connection_pool = pool.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
conf, max_size, min_size, ttl,
url, Connection)
super(RabbitDriver, self).__init__(

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,6 +18,7 @@ import sys
import threading
from oslo_log import log as logging
from oslo_utils import timeutils
import six
from oslo_messaging._drivers import common
@ -38,28 +38,48 @@ else:
class Pool(object):
"""A thread-safe object pool.
Modelled after the eventlet.pools.Pool interface, but designed to be safe
when using native threads without the GIL.
Resizing is not supported.
def __init__(self, max_size=4):
def __init__(self, max_size=4, min_size=2, ttl=1200, on_expire=None):
super(Pool, self).__init__()
self._min_size = min_size
self._max_size = max_size
self._item_ttl = ttl
self._current_size = 0
self._cond = threading.Condition()
self._items = collections.deque()
self._on_expire = on_expire
def expire(self):
"""Remove expired items from left (the oldest item) to
right (the newest item).
with self._cond:
while len(self._items) > self._min_size:
ttl_watch, item = self._items.popleft()
if ttl_watch.expired():
self._on_expire and self._on_expire(item)
self._current_size -= 1
self._items.appendleft((ttl_watch, item))
except IndexError:
def put(self, item):
"""Return an item to the pool."""
with self._cond:
ttl_watch = timeutils.StopWatch(duration=self._item_ttl)
self._items.append((ttl_watch, item))
def get(self):
@ -70,7 +90,9 @@ class Pool(object):
with self._cond:
while True:
return self._items.popleft()
ttl_watch, item = self._items.pop()
return item
except IndexError:
@ -90,12 +112,12 @@ class Pool(object):
def iter_free(self):
"""Iterate over free items."""
with self._cond:
while True:
yield self._items.popleft()
except IndexError:
while True:
_, item = self._items.pop()
yield item
except IndexError:
raise StopIteration
def create(self):
@ -104,17 +126,20 @@ class Pool(object):
class ConnectionPool(Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
def __init__(self, conf, max_size, min_size, ttl, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None
super(ConnectionPool, self).__init__(max_size, min_size, ttl,
def _on_expire(self, connection):
LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items))
# TODO(comstud): Timeout connections not used in a while
def create(self, purpose=None):
if purpose is None:
purpose = common.PURPOSE_SEND
def create(self, purpose=common.PURPOSE_SEND):
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose)

@ -0,0 +1,8 @@
- |
| Idle connections in the pool will be expired and closed.
| Default ttl is 1200s. Next configuration params was added
* *conn_pool_ttl* (defaul 1200)
* *conn_pool_min_size* (default 2)