diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index d21320998..24d703f41 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -25,10 +25,13 @@ import six from oslo_messaging import exceptions base_opts = [ - cfg.IntOpt('rpc_conn_pool_size', - default=30, + cfg.IntOpt('rpc_conn_pool_size', default=30, deprecated_group='DEFAULT', help='Size of RPC connection pool.'), + cfg.IntOpt('conn_pool_min_size', default=2, + help='The pool size limit for connections expiration policy'), + cfg.IntOpt('conn_pool_ttl', default=1200, + help='The time-to-live in sec of idle connections in the pool') ] diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index e7c364700..b448fcdbb 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -50,6 +50,12 @@ kafka_opts = [ cfg.IntOpt('pool_size', default=10, help='Pool Size for Kafka Consumers'), + + cfg.IntOpt('conn_pool_min_size', default=2, + help='The pool size limit for connections expiration policy'), + + cfg.IntOpt('conn_pool_ttl', default=1200, + help='The time-to-live in sec of idle connections in the pool') ] CONF = cfg.CONF @@ -301,8 +307,13 @@ class KafkaDriver(base.BaseDriver): super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) + # the pool configuration properties + max_size = self.conf.oslo_messaging_kafka.pool_size + min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size + ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl + self.connection_pool = driver_pool.ConnectionPool( - self.conf, self.conf.oslo_messaging_kafka.pool_size, + self.conf, max_size, min_size, ttl, self._url, Connection) self.listeners = [] diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index a0f9ddf24..50a7d14fc 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1325,8 +1325,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): self.prefetch_size = ( conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count) + # the pool configuration properties + max_size = conf.oslo_messaging_rabbit.rpc_conn_pool_size + min_size = conf.oslo_messaging_rabbit.conn_pool_min_size + ttl = conf.oslo_messaging_rabbit.conn_pool_ttl + connection_pool = pool.ConnectionPool( - conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, + conf, max_size, min_size, ttl, url, Connection) super(RabbitDriver, self).__init__( diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index 1af1e3145..681dbef1e 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -1,4 +1,3 @@ - # Copyright 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -19,6 +18,7 @@ import sys import threading from oslo_log import log as logging +from oslo_utils import timeutils import six from oslo_messaging._drivers import common @@ -38,28 +38,48 @@ else: @six.add_metaclass(abc.ABCMeta) class Pool(object): - """A thread-safe object pool. Modelled after the eventlet.pools.Pool interface, but designed to be safe when using native threads without the GIL. Resizing is not supported. + """ - def __init__(self, max_size=4): + def __init__(self, max_size=4, min_size=2, ttl=1200, on_expire=None): super(Pool, self).__init__() - + self._min_size = min_size self._max_size = max_size + self._item_ttl = ttl self._current_size = 0 self._cond = threading.Condition() - self._items = collections.deque() + self._on_expire = on_expire + + def expire(self): + """Remove expired items from left (the oldest item) to + right (the newest item). + """ + with self._cond: + while len(self._items) > self._min_size: + try: + ttl_watch, item = self._items.popleft() + if ttl_watch.expired(): + self._on_expire and self._on_expire(item) + self._current_size -= 1 + else: + self._items.appendleft((ttl_watch, item)) + return + except IndexError: + break def put(self, item): """Return an item to the pool.""" with self._cond: - self._items.appendleft(item) + ttl_watch = timeutils.StopWatch(duration=self._item_ttl) + ttl_watch.start() + self._items.append((ttl_watch, item)) self._cond.notify() def get(self): @@ -70,7 +90,9 @@ class Pool(object): with self._cond: while True: try: - return self._items.popleft() + ttl_watch, item = self._items.pop() + self.expire() + return item except IndexError: pass @@ -90,12 +112,12 @@ class Pool(object): def iter_free(self): """Iterate over free items.""" - with self._cond: - while True: - try: - yield self._items.popleft() - except IndexError: - break + while True: + try: + _, item = self._items.pop() + yield item + except IndexError: + raise StopIteration @abc.abstractmethod def create(self): @@ -104,17 +126,20 @@ class Pool(object): class ConnectionPool(Pool): """Class that implements a Pool of Connections.""" - def __init__(self, conf, rpc_conn_pool_size, url, connection_cls): + + def __init__(self, conf, max_size, min_size, ttl, url, connection_cls): self.connection_cls = connection_cls self.conf = conf self.url = url - super(ConnectionPool, self).__init__(rpc_conn_pool_size) - self.reply_proxy = None + super(ConnectionPool, self).__init__(max_size, min_size, ttl, + self._on_expire) - # TODO(comstud): Timeout connections not used in a while - def create(self, purpose=None): - if purpose is None: - purpose = common.PURPOSE_SEND + def _on_expire(self, connection): + connection.close() + LOG.debug("Idle connection has expired and been closed." + " Pool size: %d" % len(self._items)) + + def create(self, purpose=common.PURPOSE_SEND): LOG.debug('Pool creating new connection') return self.connection_cls(self.conf, self.url, purpose) diff --git a/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml b/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml new file mode 100644 index 000000000..fafa33da2 --- /dev/null +++ b/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + | Idle connections in the pool will be expired and closed. + | Default ttl is 1200s. Next configuration params was added + + * *conn_pool_ttl* (defaul 1200) + * *conn_pool_min_size* (default 2)