diff --git a/etc/octavia.conf b/etc/octavia.conf index 2c8e2a5625..43618976cc 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -60,13 +60,20 @@ # controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555 # controller_ip_port_list = # failover_threads = 10 -# status_update_threads = 50 +# status_update_threads will default to the number of processors on the host +# status_update_threads = # heartbeat_interval = 10 # heartbeat_key = # heartbeat_timeout = 60 # health_check_interval = 3 # sock_rlimit = 0 +# Health/StatsUpdate options are +# *_db +# *_logger +# health_update_driver = health_db +# stats_update_driver = stats_db + # EventStreamer options are # queue_event_streamer, # noop_event_streamer diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py index 20c8c55161..4f7a7bbe53 100644 --- a/octavia/amphorae/drivers/health/heartbeat_udp.py +++ b/octavia/amphorae/drivers/health/heartbeat_udp.py @@ -18,24 +18,42 @@ import time from oslo_config import cfg from oslo_log import log as logging +from stevedore import driver as stevedore_driver from octavia.amphorae.backends.health_daemon import status_message from octavia.common import exceptions from octavia.db import repositories UDP_MAX_SIZE = 64 * 1024 +CONF = cfg.CONF LOG = logging.getLogger(__name__) +def update_health(obj): + handler = stevedore_driver.DriverManager( + namespace='octavia.amphora.health_update_drivers', + name=CONF.health_manager.health_update_driver, + invoke_on_load=True + ).driver + handler.update_health(obj) + + +def update_stats(obj): + handler = stevedore_driver.DriverManager( + namespace='octavia.amphora.stats_update_drivers', + name=CONF.health_manager.stats_update_driver, + invoke_on_load=True + ).driver + handler.update_stats(obj) + + class UDPStatusGetter(object): """This class defines methods that will gather heatbeats The heartbeats are transmitted via UDP and this class will bind to a port and absorb them """ - def __init__(self, health_update, stats_update): - self.stats_update = stats_update - self.health_update = health_update + def __init__(self): self.key = cfg.CONF.health_manager.heartbeat_key self.ip = cfg.CONF.health_manager.bind_ip self.port = cfg.CONF.health_manager.bind_port @@ -45,7 +63,7 @@ class UDPStatusGetter(object): self.sock = None self.update(self.key, self.ip, self.port) - self.executor = futures.ThreadPoolExecutor( + self.executor = futures.ProcessPoolExecutor( max_workers=cfg.CONF.health_manager.status_update_threads) self.repo = repositories.Repositories().amphorahealth @@ -172,11 +190,10 @@ class UDPStatusGetter(object): def check(self): try: - (obj, _) = self.dorecv() - if self.health_update: - self.executor.submit(self.health_update.update_health, obj) - if self.stats_update: - self.executor.submit(self.stats_update.update_stats, obj) + obj, srcaddr = self.dorecv() except exceptions.InvalidHMACException: # Pass here as the packet was dropped and logged already pass + else: + self.executor.submit(update_health, obj) + self.executor.submit(update_stats, obj) diff --git a/octavia/cmd/health_manager.py b/octavia/cmd/health_manager.py index 98b306b557..d78efcb499 100644 --- a/octavia/cmd/health_manager.py +++ b/octavia/cmd/health_manager.py @@ -27,7 +27,6 @@ from oslo_reports import guru_meditation_report as gmr from octavia.amphorae.drivers.health import heartbeat_udp from octavia.common import service from octavia.controller.healthmanager import health_manager -from octavia.controller.healthmanager import update_db from octavia import version @@ -36,11 +35,8 @@ LOG = logging.getLogger(__name__) def hm_listener(exit_event): - # TODO(german): stevedore load those drivers signal.signal(signal.SIGINT, signal.SIG_IGN) - udp_getter = heartbeat_udp.UDPStatusGetter( - update_db.UpdateHealthDb(), - update_db.UpdateStatsDb()) + udp_getter = heartbeat_udp.UDPStatusGetter() while not exit_event.is_set(): udp_getter.check() diff --git a/octavia/common/config.py b/octavia/common/config.py index 28794cbd43..e69735f54a 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -156,8 +156,8 @@ healthmanager_opts = [ default=10, help=_('Number of threads performing amphora failovers.')), cfg.IntOpt('status_update_threads', - default=50, - help=_('Number of threads performing amphora status update.')), + default=None, + help=_('Number of processes for amphora status update.')), cfg.StrOpt('heartbeat_key', help=_('key used to validate amphora sending' 'the message'), secret=True), @@ -180,6 +180,14 @@ healthmanager_opts = [ cfg.IntOpt('heartbeat_interval', default=10, help=_('Sleep time between sending heartbeats.')), + + # Used for updating health and stats + cfg.StrOpt('health_update_driver', default='health_db', + help=_('Driver for updating amphora health system.')), + cfg.StrOpt('stats_update_driver', default='stats_db', + help=_('Driver for updating amphora statistics.')), + + # Used for synchronizing neutron-lbaas and octavia cfg.StrOpt('event_streamer_driver', help=_('Specifies which driver to use for the event_streamer ' 'for syncing the octavia and neutron_lbaas dbs. If you ' diff --git a/octavia/controller/healthmanager/health_drivers/__init__.py b/octavia/controller/healthmanager/health_drivers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/octavia/controller/healthmanager/health_drivers/update_base.py b/octavia/controller/healthmanager/health_drivers/update_base.py new file mode 100644 index 0000000000..b9007b6783 --- /dev/null +++ b/octavia/controller/healthmanager/health_drivers/update_base.py @@ -0,0 +1,27 @@ +# Copyright 2018 GoDaddy +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class HealthUpdateBase(object): + @abc.abstractmethod + def update_health(self, health): + raise NotImplementedError() + + +class StatsUpdateBase(object): + @abc.abstractmethod + def update_stats(self, health_message): + raise NotImplementedError() diff --git a/octavia/controller/healthmanager/update_db.py b/octavia/controller/healthmanager/health_drivers/update_db.py similarity index 98% rename from octavia/controller/healthmanager/update_db.py rename to octavia/controller/healthmanager/health_drivers/update_db.py index 3f95820da2..cb21145cec 100644 --- a/octavia/controller/healthmanager/update_db.py +++ b/octavia/controller/healthmanager/health_drivers/update_db.py @@ -23,6 +23,7 @@ from stevedore import driver as stevedore_driver from octavia.common import constants from octavia.common import stats +from octavia.controller.healthmanager.health_drivers import update_base from octavia.controller.healthmanager import update_serializer from octavia.db import api as db_api from octavia.db import repositories as repo @@ -31,7 +32,7 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) -class UpdateHealthDb(object): +class UpdateHealthDb(update_base.HealthUpdateBase): def __init__(self): super(UpdateHealthDb, self).__init__() # first setup repo for amphora, listener,member(nodes),pool repo @@ -303,7 +304,7 @@ class UpdateHealthDb(object): return lb_status -class UpdateStatsDb(stats.StatsMixin): +class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin): def __init__(self): super(UpdateStatsDb, self).__init__() diff --git a/octavia/controller/healthmanager/health_drivers/update_logging.py b/octavia/controller/healthmanager/health_drivers/update_logging.py new file mode 100644 index 0000000000..f5ada76d95 --- /dev/null +++ b/octavia/controller/healthmanager/health_drivers/update_logging.py @@ -0,0 +1,29 @@ +# Copyright 2018 GoDaddy +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from octavia.controller.healthmanager.health_drivers import update_base + +LOG = logging.getLogger(__name__) + + +class HealthUpdateLogger(update_base.HealthUpdateBase): + def update_health(self, health): + LOG.info("Health update triggered for: %s", health.get('id')) + + +class StatsUpdateLogger(update_base.StatsUpdateBase): + def update_stats(self, health_message): + LOG.info("Stats update triggered for: %s", health_message.get('id')) diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py index d3fa581668..86ce0bb396 100644 --- a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py +++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py @@ -38,6 +38,8 @@ FAKE_ADDRINFO = ( '', (IP, PORT) ) +HEALTH_DRIVER = 'health_logger' +STATS_DRIVER = 'stats_logger' class TestHeartbeatUDP(base.TestCase): @@ -52,6 +54,32 @@ class TestHeartbeatUDP(base.TestCase): self.conf.config(group="health_manager", bind_ip=IP) self.conf.config(group="health_manager", bind_port=PORT) self.conf.config(group="health_manager", sock_rlimit=0) + self.conf.config(group="health_manager", + health_update_driver=HEALTH_DRIVER) + self.conf.config(group="health_manager", + stats_update_driver=STATS_DRIVER) + + @mock.patch('stevedore.driver.DriverManager') + def test_update_health_func(self, driver_manager): + obj = {'id': 1} + heartbeat_udp.update_health(obj) + driver_manager.assert_called_once_with( + invoke_on_load=True, + name='health_logger', + namespace='octavia.amphora.health_update_drivers' + ) + driver_manager().driver.update_health.assert_called_once_with(obj) + + @mock.patch('stevedore.driver.DriverManager') + def test_update_stats_func(self, driver_manager): + obj = {'id': 1} + heartbeat_udp.update_stats(obj) + driver_manager.assert_called_once_with( + invoke_on_load=True, + name='stats_logger', + namespace='octavia.amphora.stats_update_drivers' + ) + driver_manager().driver.update_stats.assert_called_once_with(obj) @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') @@ -62,7 +90,7 @@ class TestHeartbeatUDP(base.TestCase): bind_mock = mock.MagicMock() socket_mock.bind = bind_mock - getter = heartbeat_udp.UDPStatusGetter(None, None) + getter = heartbeat_udp.UDPStatusGetter() mock_getaddrinfo.assert_called_with(IP, PORT, 0, socket.SOCK_DGRAM) self.assertEqual((IP, PORT), getter.sockaddr) @@ -82,7 +110,7 @@ class TestHeartbeatUDP(base.TestCase): recvfrom = mock.MagicMock() socket_mock.recvfrom = recvfrom - getter = heartbeat_udp.UDPStatusGetter(None, None) + getter = heartbeat_udp.UDPStatusGetter() # key = 'TEST' msg = {"testkey": "TEST"} sample_msg = ('78daab562a492d2ec94ead54b252500a710d0e5' @@ -102,37 +130,24 @@ class TestHeartbeatUDP(base.TestCase): mock_socket.return_value = socket_mock mock_getaddrinfo.return_value = [range(1, 6)] mock_dorecv = mock.Mock() + mock_executor = mock.Mock() - getter = heartbeat_udp.UDPStatusGetter( - self.health_update, self.stats_update) + getter = heartbeat_udp.UDPStatusGetter() getter.dorecv = mock_dorecv mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)] + getter.executor = mock_executor getter.check() getter.executor.shutdown() - self.health_update.update_health.assert_called_once_with({'id': 1}) - self.stats_update.update_stats.assert_called_once_with({'id': 1}) - - @mock.patch('socket.getaddrinfo') - @mock.patch('socket.socket') - def test_check_no_mixins(self, mock_socket, mock_getaddrinfo): - self.mock_socket = mock_socket - self.mock_getaddrinfo = mock_getaddrinfo - self.mock_getaddrinfo.return_value = [range(1, 6)] - - mock_dorecv = mock.Mock() - getter = heartbeat_udp.UDPStatusGetter(None, None) - - getter.dorecv = mock_dorecv - mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)] - - getter.check() + mock_executor.submit.assert_has_calls( + [mock.call(heartbeat_udp.update_health, {'id': 1}), + mock.call(heartbeat_udp.update_stats, {'id': 1})]) @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') def test_socket_except(self, mock_socket, mock_getaddrinfo): self.assertRaises(exceptions.NetworkConfig, - heartbeat_udp.UDPStatusGetter, None, None) + heartbeat_udp.UDPStatusGetter) @mock.patch('concurrent.futures.ThreadPoolExecutor.submit') @mock.patch('socket.getaddrinfo') @@ -143,7 +158,7 @@ class TestHeartbeatUDP(base.TestCase): self.mock_getaddrinfo.return_value = [range(1, 6)] mock_dorecv = mock.Mock() - getter = heartbeat_udp.UDPStatusGetter(None, None) + getter = heartbeat_udp.UDPStatusGetter() getter.dorecv = mock_dorecv mock_dorecv.side_effect = exceptions.InvalidHMACException diff --git a/octavia/tests/unit/cmd/test_health_manager.py b/octavia/tests/unit/cmd/test_health_manager.py index 907e7f0394..d8a8f1e322 100644 --- a/octavia/tests/unit/cmd/test_health_manager.py +++ b/octavia/tests/unit/cmd/test_health_manager.py @@ -26,13 +26,9 @@ class TestHealthManagerCMD(base.TestCase): super(TestHealthManagerCMD, self).setUp() @mock.patch('multiprocessing.Event') - @mock.patch('octavia.controller.healthmanager.' - 'update_db.UpdateStatsDb') - @mock.patch('octavia.controller.healthmanager.' - 'update_db.UpdateHealthDb') @mock.patch('octavia.amphorae.drivers.health.' 'heartbeat_udp.UDPStatusGetter') - def test_hm_listener(self, mock_getter, mock_health, mock_stats, + def test_hm_listener(self, mock_getter, mock_event): mock_event.is_set.side_effect = [False, False] getter_mock = mock.MagicMock() @@ -42,7 +38,7 @@ class TestHealthManagerCMD(base.TestCase): mock_getter.return_value = getter_mock self.assertRaisesRegexp(Exception, 'break', health_manager.hm_listener, mock_event) - mock_getter.assert_called_once_with(mock_health(), mock_stats()) + mock_getter.assert_called_once() self.assertEqual(2, getter_mock.check.call_count) @mock.patch('multiprocessing.Event') diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/__init__.py b/octavia/tests/unit/controller/healthmanager/health_drivers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py new file mode 100644 index 0000000000..1fa01e469e --- /dev/null +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py @@ -0,0 +1,38 @@ +# Copyright 2018 GoDaddy +# Copyright (c) 2015 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from octavia.controller.healthmanager.health_drivers import update_base +from octavia.tests.unit import base + + +class TestHealthUpdateBase(base.TestCase): + + def setUp(self): + super(TestHealthUpdateBase, self).setUp() + self.logger = update_base.HealthUpdateBase() + + def test_update_health(self): + self.assertRaises(NotImplementedError, + self.logger.update_health, {'id': 1}) + + +class TestStatsUpdateBase(base.TestCase): + def setUp(self): + super(TestStatsUpdateBase, self).setUp() + self.logger = update_base.StatsUpdateBase() + + def test_update_stats(self): + self.assertRaises(NotImplementedError, + self.logger.update_stats, {'id': 1}) diff --git a/octavia/tests/unit/controller/healthmanager/test_update_db.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py similarity index 99% rename from octavia/tests/unit/controller/healthmanager/test_update_db.py rename to octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py index 8707ec9f9d..1d92a51aaf 100644 --- a/octavia/tests/unit/controller/healthmanager/test_update_db.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py @@ -24,7 +24,7 @@ import sqlalchemy from octavia.common import constants from octavia.common import data_models -from octavia.controller.healthmanager import update_db +from octavia.controller.healthmanager.health_drivers import update_db from octavia.db import models as db_models from octavia.tests.unit import base diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py new file mode 100644 index 0000000000..693efc435f --- /dev/null +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py @@ -0,0 +1,44 @@ +# Copyright 2018 GoDaddy +# Copyright (c) 2015 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from octavia.controller.healthmanager.health_drivers import update_logging +from octavia.tests.unit import base + + +class TestHealthUpdateLogger(base.TestCase): + + def setUp(self): + super(TestHealthUpdateLogger, self).setUp() + self.logger = update_logging.HealthUpdateLogger() + + @mock.patch('octavia.controller.healthmanager.health_drivers' + '.update_logging.LOG') + def test_update_health(self, mock_log): + self.logger.update_health({'id': 1}) + self.assertEqual(1, mock_log.info.call_count) + + +class TestStatsUpdateLogger(base.TestCase): + def setUp(self): + super(TestStatsUpdateLogger, self).setUp() + self.logger = update_logging.StatsUpdateLogger() + + @mock.patch('octavia.controller.healthmanager.health_drivers' + '.update_logging.LOG') + def test_update_stats(self, mock_log): + self.logger.update_stats({'id': 1}) + self.assertEqual(1, mock_log.info.call_count) diff --git a/setup.cfg b/setup.cfg index 8af396c6b8..7c013cddc7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -75,6 +75,12 @@ octavia.api.handlers = octavia.amphora.drivers = amphora_noop_driver = octavia.amphorae.drivers.noop_driver.driver:NoopAmphoraLoadBalancerDriver amphora_haproxy_rest_driver = octavia.amphorae.drivers.haproxy.rest_api_driver:HaproxyAmphoraLoadBalancerDriver +octavia.amphora.health_update_drivers = + health_logger = octavia.controller.healthmanager.health_drivers.update_logging:HealthUpdateLogger + health_db = octavia.controller.healthmanager.health_drivers.update_db:UpdateHealthDb +octavia.amphora.stats_update_drivers = + stats_logger = octavia.controller.healthmanager.health_drivers.update_logging:StatsUpdateLogger + stats_db = octavia_controller.healthmanager.health_drivers.update_db:UpdateStatsDb octavia.controller.queues = noop_event_streamer = octavia.controller.queue.event_queue:EventStreamerNoop queue_event_streamer = octavia.controller.queue.event_queue:EventStreamerNeutron