From ddb948a5ad273c9bf18c3b0ca9d3db524154e636 Mon Sep 17 00:00:00 2001 From: Tatsuma Matsuki Date: Wed, 11 Jul 2018 15:05:05 +0900 Subject: [PATCH] Separate the thread pool for health and stats update When queue_event_streamer driver is used and RabbitMQ is down, stats update processes occupy the thread pool which is shared with health update processes. Then, RabbitMQ down unexpectedly leads to delete all existing amphorae. This commit separates the thread pool and aims to keep the existing amphorae working even when RabbitMQ is down. In addition to the cherry-pick from master, this patch fixes the release note to not call out a feature but a fix instead, and drop the deprecation of status_update_threads. Change-Id: I576687f5b646496ff3a00787cf5e8c27f36b9448 Task: 22929 Story: 2002937 (cherry picked from commit ad69363fc753c0689214fcdc26d6ef6c90d0d520) --- etc/octavia.conf | 8 +++++++- octavia/amphorae/drivers/health/heartbeat_udp.py | 10 ++++++---- octavia/cmd/health_manager.py | 3 ++- octavia/common/config.py | 13 +++++++++++++ .../drivers/health/test_heartbeat_udp.py | 16 ++++++++++------ ...for-health-stats-update-c263c844075a7721.yaml | 5 +++++ 6 files changed, 43 insertions(+), 12 deletions(-) create mode 100644 releasenotes/notes/separate-thread-pool-for-health-stats-update-c263c844075a7721.yaml diff --git a/etc/octavia.conf b/etc/octavia.conf index ec8900280e..676cfe7d37 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -74,8 +74,14 @@ # controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555 # controller_ip_port_list = # failover_threads = 10 -# status_update_threads will default to the number of processors on the host +# status_update_threads will default to the number of processors on the host. +# If you specify health_update_threads and stats_update_threads, they override +# this parameter. # status_update_threads = +# health_update_threads will default to the number of processors on the host +# health_update_threads = +# stats_update_threads will default to the number of processors on the host +# stats_update_threads = # heartbeat_interval = 10 # heartbeat_key = # heartbeat_timeout = 60 diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py index c4b07cdbf3..71c86b8ee0 100644 --- a/octavia/amphorae/drivers/health/heartbeat_udp.py +++ b/octavia/amphorae/drivers/health/heartbeat_udp.py @@ -63,8 +63,10 @@ class UDPStatusGetter(object): self.sock = None self.update(self.key, self.ip, self.port) - self.executor = futures.ProcessPoolExecutor( - max_workers=cfg.CONF.health_manager.status_update_threads) + self.health_executor = futures.ProcessPoolExecutor( + max_workers=CONF.health_manager.health_update_threads) + self.stats_executor = futures.ProcessPoolExecutor( + max_workers=CONF.health_manager.stats_update_threads) self.repo = repositories.Repositories().amphorahealth def update(self, key, ip, port): @@ -209,5 +211,5 @@ class UDPStatusGetter(object): 'heartbeat packet. Ignoring this packet. ' 'Exception: %s', e) else: - self.executor.submit(update_health, obj, srcaddr) - self.executor.submit(update_stats, obj, srcaddr) + self.health_executor.submit(update_health, obj, srcaddr) + self.stats_executor.submit(update_stats, obj, srcaddr) diff --git a/octavia/cmd/health_manager.py b/octavia/cmd/health_manager.py index 1ed7cef7a3..547e205c30 100644 --- a/octavia/cmd/health_manager.py +++ b/octavia/cmd/health_manager.py @@ -50,7 +50,8 @@ def hm_listener(exit_event): LOG.error('Health Manager listener experienced unknown error: %s', e) LOG.info('Waiting for executor to shutdown...') - udp_getter.executor.shutdown() + udp_getter.health_executor.shutdown() + udp_getter.stats_executor.shutdown() LOG.info('Executor shutdown finished.') diff --git a/octavia/common/config.py b/octavia/common/config.py index 41889a5326..7d7fbdcd5c 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -186,6 +186,12 @@ healthmanager_opts = [ cfg.IntOpt('status_update_threads', default=None, help=_('Number of processes for amphora status update.')), + cfg.IntOpt('health_update_threads', + default=None, + help=_('Number of processes for amphora health update.')), + cfg.IntOpt('stats_update_threads', + default=None, + help=_('Number of processes for amphora stats update.')), cfg.StrOpt('heartbeat_key', help=_('key used to validate amphora sending' 'the message'), secret=True), @@ -659,3 +665,10 @@ def handle_deprecation_compatibility(): if cfg.CONF.api_handler is not None: cfg.CONF.set_default('api_handler', cfg.CONF.api_handler, group='api_settings') + if cfg.CONF.health_manager.status_update_threads is not None: + cfg.CONF.set_default('health_update_threads', + cfg.CONF.health_manager.status_update_threads, + group='health_manager') + cfg.CONF.set_default('stats_update_threads', + cfg.CONF.health_manager.status_update_threads, + group='health_manager') diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py index f58b27c551..04a56d27e4 100644 --- a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py +++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py @@ -155,18 +155,22 @@ class TestHeartbeatUDP(base.TestCase): mock_socket.return_value = socket_mock mock_getaddrinfo.return_value = [range(1, 6)] mock_dorecv = mock.Mock() - mock_executor = mock.Mock() + mock_health_executor = mock.Mock() + mock_stats_executor = mock.Mock() getter = heartbeat_udp.UDPStatusGetter() getter.dorecv = mock_dorecv mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)] - getter.executor = mock_executor + getter.health_executor = mock_health_executor + getter.stats_executor = mock_stats_executor getter.check() - getter.executor.shutdown() - mock_executor.submit.assert_has_calls( - [mock.call(heartbeat_udp.update_health, {'id': 1}, 2), - mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)]) + getter.health_executor.shutdown() + getter.stats_executor.shutdown() + mock_health_executor.submit.assert_has_calls( + [mock.call(heartbeat_udp.update_health, {'id': 1}, 2)]) + mock_stats_executor.submit.assert_has_calls( + [mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)]) @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') diff --git a/releasenotes/notes/separate-thread-pool-for-health-stats-update-c263c844075a7721.yaml b/releasenotes/notes/separate-thread-pool-for-health-stats-update-c263c844075a7721.yaml new file mode 100644 index 0000000000..09acc10704 --- /dev/null +++ b/releasenotes/notes/separate-thread-pool-for-health-stats-update-c263c844075a7721.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Add new parameters to specify the number of threads for updating amphora + health and stats.