From 162f6e987b92d609b0e6187ddfc1f40dca88e400 Mon Sep 17 00:00:00 2001 From: Kirill Bespalov Date: Mon, 11 Jul 2016 15:25:23 +0300 Subject: [PATCH] Introduce TTL for idle connections We can reduce a workload of rabbitmq through implementation of expiration mechanism for idle connections in the pool with next properties: conn_pool_ttl (default 20 min) conn_pool_min_size: the pool size limit for expire() (default 2) The problem is timeless idle connections in the pool, which can be created via some single huge workload of RPCServer. One SEND connection is heartbeat thread + some network activity every n second. So, we can reduce it. Here is two ways to implement an expiration: [1] Create a separated thread for checking expire date of connections [2] Make call expire() on pool.get() or pool.put() The [1] has some threading overhead, but probably insignificant because the thread can sleep 99% time and wake up every 20 mins (by default). Anyway current implementation is [2]. Change-Id: Ie8781d10549a044656824ceb78b2fe2e4f7f8b43 --- oslo_messaging/_drivers/base.py | 7 +- oslo_messaging/_drivers/impl_kafka.py | 13 +++- oslo_messaging/_drivers/impl_rabbit.py | 7 +- oslo_messaging/_drivers/pool.py | 65 +++++++++++++------ .../connection_ttl-2cf0fe6e1ab8c73c.yaml | 8 +++ 5 files changed, 76 insertions(+), 24 deletions(-) create mode 100644 releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index d21320998..24d703f41 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -25,10 +25,13 @@ import six from oslo_messaging import exceptions base_opts = [ - cfg.IntOpt('rpc_conn_pool_size', - default=30, + cfg.IntOpt('rpc_conn_pool_size', default=30, deprecated_group='DEFAULT', help='Size of RPC connection pool.'), + cfg.IntOpt('conn_pool_min_size', default=2, + help='The pool size limit for connections expiration policy'), + cfg.IntOpt('conn_pool_ttl', default=1200, + help='The time-to-live in sec of idle connections in the pool') ] diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index e7c364700..b448fcdbb 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -50,6 +50,12 @@ kafka_opts = [ cfg.IntOpt('pool_size', default=10, help='Pool Size for Kafka Consumers'), + + cfg.IntOpt('conn_pool_min_size', default=2, + help='The pool size limit for connections expiration policy'), + + cfg.IntOpt('conn_pool_ttl', default=1200, + help='The time-to-live in sec of idle connections in the pool') ] CONF = cfg.CONF @@ -301,8 +307,13 @@ class KafkaDriver(base.BaseDriver): super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) + # the pool configuration properties + max_size = self.conf.oslo_messaging_kafka.pool_size + min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size + ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl + self.connection_pool = driver_pool.ConnectionPool( - self.conf, self.conf.oslo_messaging_kafka.pool_size, + self.conf, max_size, min_size, ttl, self._url, Connection) self.listeners = [] diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index f788bba51..936a837f6 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1321,8 +1321,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): self.prefetch_size = ( conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count) + # the pool configuration properties + max_size = conf.oslo_messaging_rabbit.rpc_conn_pool_size + min_size = conf.oslo_messaging_rabbit.conn_pool_min_size + ttl = conf.oslo_messaging_rabbit.conn_pool_ttl + connection_pool = pool.ConnectionPool( - conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, + conf, max_size, min_size, ttl, url, Connection) super(RabbitDriver, self).__init__( diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index 1af1e3145..681dbef1e 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -1,4 +1,3 @@ - # Copyright 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -19,6 +18,7 @@ import sys import threading from oslo_log import log as logging +from oslo_utils import timeutils import six from oslo_messaging._drivers import common @@ -38,28 +38,48 @@ else: @six.add_metaclass(abc.ABCMeta) class Pool(object): - """A thread-safe object pool. Modelled after the eventlet.pools.Pool interface, but designed to be safe when using native threads without the GIL. Resizing is not supported. + """ - def __init__(self, max_size=4): + def __init__(self, max_size=4, min_size=2, ttl=1200, on_expire=None): super(Pool, self).__init__() - + self._min_size = min_size self._max_size = max_size + self._item_ttl = ttl self._current_size = 0 self._cond = threading.Condition() - self._items = collections.deque() + self._on_expire = on_expire + + def expire(self): + """Remove expired items from left (the oldest item) to + right (the newest item). + """ + with self._cond: + while len(self._items) > self._min_size: + try: + ttl_watch, item = self._items.popleft() + if ttl_watch.expired(): + self._on_expire and self._on_expire(item) + self._current_size -= 1 + else: + self._items.appendleft((ttl_watch, item)) + return + except IndexError: + break def put(self, item): """Return an item to the pool.""" with self._cond: - self._items.appendleft(item) + ttl_watch = timeutils.StopWatch(duration=self._item_ttl) + ttl_watch.start() + self._items.append((ttl_watch, item)) self._cond.notify() def get(self): @@ -70,7 +90,9 @@ class Pool(object): with self._cond: while True: try: - return self._items.popleft() + ttl_watch, item = self._items.pop() + self.expire() + return item except IndexError: pass @@ -90,12 +112,12 @@ class Pool(object): def iter_free(self): """Iterate over free items.""" - with self._cond: - while True: - try: - yield self._items.popleft() - except IndexError: - break + while True: + try: + _, item = self._items.pop() + yield item + except IndexError: + raise StopIteration @abc.abstractmethod def create(self): @@ -104,17 +126,20 @@ class Pool(object): class ConnectionPool(Pool): """Class that implements a Pool of Connections.""" - def __init__(self, conf, rpc_conn_pool_size, url, connection_cls): + + def __init__(self, conf, max_size, min_size, ttl, url, connection_cls): self.connection_cls = connection_cls self.conf = conf self.url = url - super(ConnectionPool, self).__init__(rpc_conn_pool_size) - self.reply_proxy = None + super(ConnectionPool, self).__init__(max_size, min_size, ttl, + self._on_expire) - # TODO(comstud): Timeout connections not used in a while - def create(self, purpose=None): - if purpose is None: - purpose = common.PURPOSE_SEND + def _on_expire(self, connection): + connection.close() + LOG.debug("Idle connection has expired and been closed." + " Pool size: %d" % len(self._items)) + + def create(self, purpose=common.PURPOSE_SEND): LOG.debug('Pool creating new connection') return self.connection_cls(self.conf, self.url, purpose) diff --git a/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml b/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml new file mode 100644 index 000000000..fafa33da2 --- /dev/null +++ b/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + | Idle connections in the pool will be expired and closed. + | Default ttl is 1200s. Next configuration params was added + + * *conn_pool_ttl* (defaul 1200) + * *conn_pool_min_size* (default 2)