Separate the thread pool for health and stats update

When queue_event_streamer driver is used and RabbitMQ
is down, stats update processes occupy the thread pool
which is shared with health update processes. Then,
RabbitMQ down unexpectedly leads to delete all existing
amphorae. This commit separates the thread pool and aims
to keep the existing amphorae working even when RabbitMQ
is down.

Change-Id: I576687f5b646496ff3a00787cf5e8c27f36b9448
Task: 22929
Story: 2002937
This commit is contained in:
Tatsuma Matsuki 2018-07-11 15:05:05 +09:00
parent 4d867f623d
commit ad69363fc7
6 changed files with 55 additions and 13 deletions

View File

@ -74,8 +74,14 @@
# controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555
# controller_ip_port_list =
# failover_threads = 10
# status_update_threads will default to the number of processors on the host
# status_update_threads will default to the number of processors on the host.
# This setting is deprecated and if you specify health_update_threads and
# stats_update_threads, they override this parameter.
# status_update_threads =
# health_update_threads will default to the number of processors on the host
# health_update_threads =
# stats_update_threads will default to the number of processors on the host
# stats_update_threads =
# heartbeat_interval = 10
# heartbeat_key =
# heartbeat_timeout = 60

View File

@ -63,8 +63,10 @@ class UDPStatusGetter(object):
self.sock = None
self.update(self.key, self.ip, self.port)
self.executor = futures.ProcessPoolExecutor(
max_workers=cfg.CONF.health_manager.status_update_threads)
self.health_executor = futures.ProcessPoolExecutor(
max_workers=CONF.health_manager.health_update_threads)
self.stats_executor = futures.ProcessPoolExecutor(
max_workers=CONF.health_manager.stats_update_threads)
self.repo = repositories.Repositories().amphorahealth
def update(self, key, ip, port):
@ -209,5 +211,5 @@ class UDPStatusGetter(object):
'heartbeat packet. Ignoring this packet. '
'Exception: %s', e)
else:
self.executor.submit(update_health, obj, srcaddr)
self.executor.submit(update_stats, obj, srcaddr)
self.health_executor.submit(update_health, obj, srcaddr)
self.stats_executor.submit(update_stats, obj, srcaddr)

View File

@ -50,7 +50,8 @@ def hm_listener(exit_event):
LOG.error('Health Manager listener experienced unknown error: %s',
e)
LOG.info('Waiting for executor to shutdown...')
udp_getter.executor.shutdown()
udp_getter.health_executor.shutdown()
udp_getter.stats_executor.shutdown()
LOG.info('Executor shutdown finished.')

View File

@ -183,9 +183,20 @@ healthmanager_opts = [
cfg.IntOpt('failover_threads',
default=10,
help=_('Number of threads performing amphora failovers.')),
# TODO(tatsuma) Remove in or after "T" release
cfg.IntOpt('status_update_threads',
default=None,
help=_('Number of processes for amphora status update.')),
help=_('Number of processes for amphora status update.'),
deprecated_for_removal=True,
deprecated_reason=_('This option is replaced as '
'health_update_threads and '
'stats_update_threads')),
cfg.IntOpt('health_update_threads',
default=None,
help=_('Number of processes for amphora health update.')),
cfg.IntOpt('stats_update_threads',
default=None,
help=_('Number of processes for amphora stats update.')),
cfg.StrOpt('heartbeat_key',
help=_('key used to validate amphora sending'
'the message'), secret=True),
@ -659,3 +670,11 @@ def handle_deprecation_compatibility():
if cfg.CONF.api_handler is not None:
cfg.CONF.set_default('api_handler', cfg.CONF.api_handler,
group='api_settings')
# TODO(tatsuma) Remove in or after "T" release
if cfg.CONF.health_manager.status_update_threads is not None:
cfg.CONF.set_default('health_update_threads',
cfg.CONF.health_manager.status_update_threads,
group='health_manager')
cfg.CONF.set_default('stats_update_threads',
cfg.CONF.health_manager.status_update_threads,
group='health_manager')

View File

@ -155,18 +155,22 @@ class TestHeartbeatUDP(base.TestCase):
mock_socket.return_value = socket_mock
mock_getaddrinfo.return_value = [range(1, 6)]
mock_dorecv = mock.Mock()
mock_executor = mock.Mock()
mock_health_executor = mock.Mock()
mock_stats_executor = mock.Mock()
getter = heartbeat_udp.UDPStatusGetter()
getter.dorecv = mock_dorecv
mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)]
getter.executor = mock_executor
getter.health_executor = mock_health_executor
getter.stats_executor = mock_stats_executor
getter.check()
getter.executor.shutdown()
mock_executor.submit.assert_has_calls(
[mock.call(heartbeat_udp.update_health, {'id': 1}, 2),
mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)])
getter.health_executor.shutdown()
getter.stats_executor.shutdown()
mock_health_executor.submit.assert_has_calls(
[mock.call(heartbeat_udp.update_health, {'id': 1}, 2)])
mock_stats_executor.submit.assert_has_calls(
[mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)])
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')

View File

@ -0,0 +1,10 @@
---
features:
- |
Add new parameters to specify the number of threads for updating amphora
health and stats.
deprecations:
- |
`status_update_threads` config option for healthmanager is deprecated
because it is replaced as `health_update_threads` and
`stats_update_threads`.