diff --git a/etc/octavia.conf b/etc/octavia.conf index ec8900280e..676cfe7d37 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -74,8 +74,14 @@ # controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555 # controller_ip_port_list = # failover_threads = 10 -# status_update_threads will default to the number of processors on the host +# status_update_threads will default to the number of processors on the host. +# If you specify health_update_threads and stats_update_threads, they override +# this parameter. # status_update_threads = +# health_update_threads will default to the number of processors on the host +# health_update_threads = +# stats_update_threads will default to the number of processors on the host +# stats_update_threads = # heartbeat_interval = 10 # heartbeat_key = # heartbeat_timeout = 60 diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py index c4b07cdbf3..71c86b8ee0 100644 --- a/octavia/amphorae/drivers/health/heartbeat_udp.py +++ b/octavia/amphorae/drivers/health/heartbeat_udp.py @@ -63,8 +63,10 @@ class UDPStatusGetter(object): self.sock = None self.update(self.key, self.ip, self.port) - self.executor = futures.ProcessPoolExecutor( - max_workers=cfg.CONF.health_manager.status_update_threads) + self.health_executor = futures.ProcessPoolExecutor( + max_workers=CONF.health_manager.health_update_threads) + self.stats_executor = futures.ProcessPoolExecutor( + max_workers=CONF.health_manager.stats_update_threads) self.repo = repositories.Repositories().amphorahealth def update(self, key, ip, port): @@ -209,5 +211,5 @@ class UDPStatusGetter(object): 'heartbeat packet. Ignoring this packet. ' 'Exception: %s', e) else: - self.executor.submit(update_health, obj, srcaddr) - self.executor.submit(update_stats, obj, srcaddr) + self.health_executor.submit(update_health, obj, srcaddr) + self.stats_executor.submit(update_stats, obj, srcaddr) diff --git a/octavia/cmd/health_manager.py b/octavia/cmd/health_manager.py index 1ed7cef7a3..547e205c30 100644 --- a/octavia/cmd/health_manager.py +++ b/octavia/cmd/health_manager.py @@ -50,7 +50,8 @@ def hm_listener(exit_event): LOG.error('Health Manager listener experienced unknown error: %s', e) LOG.info('Waiting for executor to shutdown...') - udp_getter.executor.shutdown() + udp_getter.health_executor.shutdown() + udp_getter.stats_executor.shutdown() LOG.info('Executor shutdown finished.') diff --git a/octavia/common/config.py b/octavia/common/config.py index 41889a5326..7d7fbdcd5c 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -186,6 +186,12 @@ healthmanager_opts = [ cfg.IntOpt('status_update_threads', default=None, help=_('Number of processes for amphora status update.')), + cfg.IntOpt('health_update_threads', + default=None, + help=_('Number of processes for amphora health update.')), + cfg.IntOpt('stats_update_threads', + default=None, + help=_('Number of processes for amphora stats update.')), cfg.StrOpt('heartbeat_key', help=_('key used to validate amphora sending' 'the message'), secret=True), @@ -659,3 +665,10 @@ def handle_deprecation_compatibility(): if cfg.CONF.api_handler is not None: cfg.CONF.set_default('api_handler', cfg.CONF.api_handler, group='api_settings') + if cfg.CONF.health_manager.status_update_threads is not None: + cfg.CONF.set_default('health_update_threads', + cfg.CONF.health_manager.status_update_threads, + group='health_manager') + cfg.CONF.set_default('stats_update_threads', + cfg.CONF.health_manager.status_update_threads, + group='health_manager') diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py index f58b27c551..04a56d27e4 100644 --- a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py +++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py @@ -155,18 +155,22 @@ class TestHeartbeatUDP(base.TestCase): mock_socket.return_value = socket_mock mock_getaddrinfo.return_value = [range(1, 6)] mock_dorecv = mock.Mock() - mock_executor = mock.Mock() + mock_health_executor = mock.Mock() + mock_stats_executor = mock.Mock() getter = heartbeat_udp.UDPStatusGetter() getter.dorecv = mock_dorecv mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)] - getter.executor = mock_executor + getter.health_executor = mock_health_executor + getter.stats_executor = mock_stats_executor getter.check() - getter.executor.shutdown() - mock_executor.submit.assert_has_calls( - [mock.call(heartbeat_udp.update_health, {'id': 1}, 2), - mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)]) + getter.health_executor.shutdown() + getter.stats_executor.shutdown() + mock_health_executor.submit.assert_has_calls( + [mock.call(heartbeat_udp.update_health, {'id': 1}, 2)]) + mock_stats_executor.submit.assert_has_calls( + [mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)]) @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') diff --git a/releasenotes/notes/separate-thread-pool-for-health-stats-update-c263c844075a7721.yaml b/releasenotes/notes/separate-thread-pool-for-health-stats-update-c263c844075a7721.yaml new file mode 100644 index 0000000000..09acc10704 --- /dev/null +++ b/releasenotes/notes/separate-thread-pool-for-health-stats-update-c263c844075a7721.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Add new parameters to specify the number of threads for updating amphora + health and stats.