Merge "Separate the thread pool for health and stats update"
This commit is contained in:
commit
687a0e8472
|
@ -74,8 +74,14 @@
|
||||||
# controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555
|
# controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555
|
||||||
# controller_ip_port_list =
|
# controller_ip_port_list =
|
||||||
# failover_threads = 10
|
# failover_threads = 10
|
||||||
# status_update_threads will default to the number of processors on the host
|
# status_update_threads will default to the number of processors on the host.
|
||||||
|
# This setting is deprecated and if you specify health_update_threads and
|
||||||
|
# stats_update_threads, they override this parameter.
|
||||||
# status_update_threads =
|
# status_update_threads =
|
||||||
|
# health_update_threads will default to the number of processors on the host
|
||||||
|
# health_update_threads =
|
||||||
|
# stats_update_threads will default to the number of processors on the host
|
||||||
|
# stats_update_threads =
|
||||||
# heartbeat_interval = 10
|
# heartbeat_interval = 10
|
||||||
# heartbeat_key =
|
# heartbeat_key =
|
||||||
# heartbeat_timeout = 60
|
# heartbeat_timeout = 60
|
||||||
|
|
|
@ -63,8 +63,10 @@ class UDPStatusGetter(object):
|
||||||
self.sock = None
|
self.sock = None
|
||||||
self.update(self.key, self.ip, self.port)
|
self.update(self.key, self.ip, self.port)
|
||||||
|
|
||||||
self.executor = futures.ProcessPoolExecutor(
|
self.health_executor = futures.ProcessPoolExecutor(
|
||||||
max_workers=cfg.CONF.health_manager.status_update_threads)
|
max_workers=CONF.health_manager.health_update_threads)
|
||||||
|
self.stats_executor = futures.ProcessPoolExecutor(
|
||||||
|
max_workers=CONF.health_manager.stats_update_threads)
|
||||||
self.repo = repositories.Repositories().amphorahealth
|
self.repo = repositories.Repositories().amphorahealth
|
||||||
|
|
||||||
def update(self, key, ip, port):
|
def update(self, key, ip, port):
|
||||||
|
@ -209,5 +211,5 @@ class UDPStatusGetter(object):
|
||||||
'heartbeat packet. Ignoring this packet. '
|
'heartbeat packet. Ignoring this packet. '
|
||||||
'Exception: %s', e)
|
'Exception: %s', e)
|
||||||
else:
|
else:
|
||||||
self.executor.submit(update_health, obj, srcaddr)
|
self.health_executor.submit(update_health, obj, srcaddr)
|
||||||
self.executor.submit(update_stats, obj, srcaddr)
|
self.stats_executor.submit(update_stats, obj, srcaddr)
|
||||||
|
|
|
@ -50,7 +50,8 @@ def hm_listener(exit_event):
|
||||||
LOG.error('Health Manager listener experienced unknown error: %s',
|
LOG.error('Health Manager listener experienced unknown error: %s',
|
||||||
e)
|
e)
|
||||||
LOG.info('Waiting for executor to shutdown...')
|
LOG.info('Waiting for executor to shutdown...')
|
||||||
udp_getter.executor.shutdown()
|
udp_getter.health_executor.shutdown()
|
||||||
|
udp_getter.stats_executor.shutdown()
|
||||||
LOG.info('Executor shutdown finished.')
|
LOG.info('Executor shutdown finished.')
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -183,9 +183,20 @@ healthmanager_opts = [
|
||||||
cfg.IntOpt('failover_threads',
|
cfg.IntOpt('failover_threads',
|
||||||
default=10,
|
default=10,
|
||||||
help=_('Number of threads performing amphora failovers.')),
|
help=_('Number of threads performing amphora failovers.')),
|
||||||
|
# TODO(tatsuma) Remove in or after "T" release
|
||||||
cfg.IntOpt('status_update_threads',
|
cfg.IntOpt('status_update_threads',
|
||||||
default=None,
|
default=None,
|
||||||
help=_('Number of processes for amphora status update.')),
|
help=_('Number of processes for amphora status update.'),
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_reason=_('This option is replaced as '
|
||||||
|
'health_update_threads and '
|
||||||
|
'stats_update_threads')),
|
||||||
|
cfg.IntOpt('health_update_threads',
|
||||||
|
default=None,
|
||||||
|
help=_('Number of processes for amphora health update.')),
|
||||||
|
cfg.IntOpt('stats_update_threads',
|
||||||
|
default=None,
|
||||||
|
help=_('Number of processes for amphora stats update.')),
|
||||||
cfg.StrOpt('heartbeat_key',
|
cfg.StrOpt('heartbeat_key',
|
||||||
help=_('key used to validate amphora sending'
|
help=_('key used to validate amphora sending'
|
||||||
'the message'), secret=True),
|
'the message'), secret=True),
|
||||||
|
@ -659,3 +670,11 @@ def handle_deprecation_compatibility():
|
||||||
if cfg.CONF.api_handler is not None:
|
if cfg.CONF.api_handler is not None:
|
||||||
cfg.CONF.set_default('api_handler', cfg.CONF.api_handler,
|
cfg.CONF.set_default('api_handler', cfg.CONF.api_handler,
|
||||||
group='api_settings')
|
group='api_settings')
|
||||||
|
# TODO(tatsuma) Remove in or after "T" release
|
||||||
|
if cfg.CONF.health_manager.status_update_threads is not None:
|
||||||
|
cfg.CONF.set_default('health_update_threads',
|
||||||
|
cfg.CONF.health_manager.status_update_threads,
|
||||||
|
group='health_manager')
|
||||||
|
cfg.CONF.set_default('stats_update_threads',
|
||||||
|
cfg.CONF.health_manager.status_update_threads,
|
||||||
|
group='health_manager')
|
||||||
|
|
|
@ -155,18 +155,22 @@ class TestHeartbeatUDP(base.TestCase):
|
||||||
mock_socket.return_value = socket_mock
|
mock_socket.return_value = socket_mock
|
||||||
mock_getaddrinfo.return_value = [range(1, 6)]
|
mock_getaddrinfo.return_value = [range(1, 6)]
|
||||||
mock_dorecv = mock.Mock()
|
mock_dorecv = mock.Mock()
|
||||||
mock_executor = mock.Mock()
|
mock_health_executor = mock.Mock()
|
||||||
|
mock_stats_executor = mock.Mock()
|
||||||
|
|
||||||
getter = heartbeat_udp.UDPStatusGetter()
|
getter = heartbeat_udp.UDPStatusGetter()
|
||||||
getter.dorecv = mock_dorecv
|
getter.dorecv = mock_dorecv
|
||||||
mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)]
|
mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)]
|
||||||
getter.executor = mock_executor
|
getter.health_executor = mock_health_executor
|
||||||
|
getter.stats_executor = mock_stats_executor
|
||||||
|
|
||||||
getter.check()
|
getter.check()
|
||||||
getter.executor.shutdown()
|
getter.health_executor.shutdown()
|
||||||
mock_executor.submit.assert_has_calls(
|
getter.stats_executor.shutdown()
|
||||||
[mock.call(heartbeat_udp.update_health, {'id': 1}, 2),
|
mock_health_executor.submit.assert_has_calls(
|
||||||
mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)])
|
[mock.call(heartbeat_udp.update_health, {'id': 1}, 2)])
|
||||||
|
mock_stats_executor.submit.assert_has_calls(
|
||||||
|
[mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)])
|
||||||
|
|
||||||
@mock.patch('socket.getaddrinfo')
|
@mock.patch('socket.getaddrinfo')
|
||||||
@mock.patch('socket.socket')
|
@mock.patch('socket.socket')
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Add new parameters to specify the number of threads for updating amphora
|
||||||
|
health and stats.
|
||||||
|
deprecations:
|
||||||
|
- |
|
||||||
|
`status_update_threads` config option for healthmanager is deprecated
|
||||||
|
because it is replaced as `health_update_threads` and
|
||||||
|
`stats_update_threads`.
|
Loading…
Reference in New Issue