Separate the thread pool for health and stats update

When queue_event_streamer driver is used and RabbitMQ
is down, stats update processes occupy the thread pool
which is shared with health update processes. Then,
RabbitMQ down unexpectedly leads to delete all existing
amphorae. This commit separates the thread pool and aims
to keep the existing amphorae working even when RabbitMQ
is down.

In addition to the cherry-pick from master, this patch fixes the release
note to not call out a feature but a fix instead, and drop the
deprecation of status_update_threads.

Change-Id: I576687f5b646496ff3a00787cf5e8c27f36b9448
Task: 22929
Story: 2002937
(cherry picked from commit ad69363fc7)
This commit is contained in:
Tatsuma Matsuki 2018-07-11 15:05:05 +09:00 committed by Carlos Goncalves
parent cc85cede24
commit ddb948a5ad
6 changed files with 43 additions and 12 deletions

View File

@ -74,8 +74,14 @@
# controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555
# controller_ip_port_list =
# failover_threads = 10
# status_update_threads will default to the number of processors on the host
# status_update_threads will default to the number of processors on the host.
# If you specify health_update_threads and stats_update_threads, they override
# this parameter.
# status_update_threads =
# health_update_threads will default to the number of processors on the host
# health_update_threads =
# stats_update_threads will default to the number of processors on the host
# stats_update_threads =
# heartbeat_interval = 10
# heartbeat_key =
# heartbeat_timeout = 60

View File

@ -63,8 +63,10 @@ class UDPStatusGetter(object):
self.sock = None
self.update(self.key, self.ip, self.port)
self.executor = futures.ProcessPoolExecutor(
max_workers=cfg.CONF.health_manager.status_update_threads)
self.health_executor = futures.ProcessPoolExecutor(
max_workers=CONF.health_manager.health_update_threads)
self.stats_executor = futures.ProcessPoolExecutor(
max_workers=CONF.health_manager.stats_update_threads)
self.repo = repositories.Repositories().amphorahealth
def update(self, key, ip, port):
@ -209,5 +211,5 @@ class UDPStatusGetter(object):
'heartbeat packet. Ignoring this packet. '
'Exception: %s', e)
else:
self.executor.submit(update_health, obj, srcaddr)
self.executor.submit(update_stats, obj, srcaddr)
self.health_executor.submit(update_health, obj, srcaddr)
self.stats_executor.submit(update_stats, obj, srcaddr)

View File

@ -50,7 +50,8 @@ def hm_listener(exit_event):
LOG.error('Health Manager listener experienced unknown error: %s',
e)
LOG.info('Waiting for executor to shutdown...')
udp_getter.executor.shutdown()
udp_getter.health_executor.shutdown()
udp_getter.stats_executor.shutdown()
LOG.info('Executor shutdown finished.')

View File

@ -186,6 +186,12 @@ healthmanager_opts = [
cfg.IntOpt('status_update_threads',
default=None,
help=_('Number of processes for amphora status update.')),
cfg.IntOpt('health_update_threads',
default=None,
help=_('Number of processes for amphora health update.')),
cfg.IntOpt('stats_update_threads',
default=None,
help=_('Number of processes for amphora stats update.')),
cfg.StrOpt('heartbeat_key',
help=_('key used to validate amphora sending'
'the message'), secret=True),
@ -659,3 +665,10 @@ def handle_deprecation_compatibility():
if cfg.CONF.api_handler is not None:
cfg.CONF.set_default('api_handler', cfg.CONF.api_handler,
group='api_settings')
if cfg.CONF.health_manager.status_update_threads is not None:
cfg.CONF.set_default('health_update_threads',
cfg.CONF.health_manager.status_update_threads,
group='health_manager')
cfg.CONF.set_default('stats_update_threads',
cfg.CONF.health_manager.status_update_threads,
group='health_manager')

View File

@ -155,18 +155,22 @@ class TestHeartbeatUDP(base.TestCase):
mock_socket.return_value = socket_mock
mock_getaddrinfo.return_value = [range(1, 6)]
mock_dorecv = mock.Mock()
mock_executor = mock.Mock()
mock_health_executor = mock.Mock()
mock_stats_executor = mock.Mock()
getter = heartbeat_udp.UDPStatusGetter()
getter.dorecv = mock_dorecv
mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)]
getter.executor = mock_executor
getter.health_executor = mock_health_executor
getter.stats_executor = mock_stats_executor
getter.check()
getter.executor.shutdown()
mock_executor.submit.assert_has_calls(
[mock.call(heartbeat_udp.update_health, {'id': 1}, 2),
mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)])
getter.health_executor.shutdown()
getter.stats_executor.shutdown()
mock_health_executor.submit.assert_has_calls(
[mock.call(heartbeat_udp.update_health, {'id': 1}, 2)])
mock_stats_executor.submit.assert_has_calls(
[mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)])
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Add new parameters to specify the number of threads for updating amphora
health and stats.