From 1417f6f0f8cbf13fd26440ae2dc53de42873d6cc Mon Sep 17 00:00:00 2001 From: Michael Johnson Date: Fri, 13 Apr 2018 16:38:31 -0700 Subject: [PATCH] Improve the error logging for zombie amphora In the case that nova failed to delete an amphroa they will continue to send health heartbeat messages the the health manager. This patch improves the logging of these amphora. It also optimizes the statistics update flow when event streaming is disabled by removing two extra database calls. This patch also removes the un-used BaseControllerTask class. This patch also finally solidifies that there will be one LB per amphora. Change-Id: Idf83b19216c680a4854c1239ed9c5bc5ce7364a7 --- .../amphorae/drivers/health/heartbeat_udp.py | 14 +- octavia/common/constants.py | 2 + .../health_drivers/update_base.py | 4 +- .../healthmanager/health_drivers/update_db.py | 198 ++++++++++-------- .../health_drivers/update_logging.py | 4 +- .../controller/worker/controller_worker.py | 5 +- .../worker/tasks/controller_tasks.py | 50 ----- octavia/db/repositories.py | 30 ++- .../tests/functional/db/test_repositories.py | 9 +- .../drivers/health/test_heartbeat_udp.py | 18 +- .../health_drivers/test_update_base.py | 4 +- .../health_drivers/test_update_db.py | 138 ++++++------ .../health_drivers/test_update_logging.py | 4 +- .../worker/tasks/test_controller_tasks.py | 80 ------- .../worker/test_controller_worker.py | 6 +- 15 files changed, 241 insertions(+), 325 deletions(-) delete mode 100644 octavia/controller/worker/tasks/controller_tasks.py delete mode 100644 octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py index 89e4d973ff..c4b07cdbf3 100644 --- a/octavia/amphorae/drivers/health/heartbeat_udp.py +++ b/octavia/amphorae/drivers/health/heartbeat_udp.py @@ -29,22 +29,22 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) -def update_health(obj): +def update_health(obj, srcaddr): handler = stevedore_driver.DriverManager( namespace='octavia.amphora.health_update_drivers', name=CONF.health_manager.health_update_driver, invoke_on_load=True ).driver - handler.update_health(obj) + handler.update_health(obj, srcaddr) -def update_stats(obj): +def update_stats(obj, srcaddr): handler = stevedore_driver.DriverManager( namespace='octavia.amphora.stats_update_drivers', name=CONF.health_manager.stats_update_driver, invoke_on_load=True ).driver - handler.update_stats(obj) + handler.update_stats(obj, srcaddr) class UDPStatusGetter(object): @@ -193,7 +193,7 @@ class UDPStatusGetter(object): 'Exception: %s', srcaddr, e) raise exceptions.InvalidHMACException() obj['recv_time'] = time.time() - return obj, srcaddr + return obj, srcaddr[0] def check(self): try: @@ -209,5 +209,5 @@ class UDPStatusGetter(object): 'heartbeat packet. Ignoring this packet. ' 'Exception: %s', e) else: - self.executor.submit(update_health, obj) - self.executor.submit(update_stats, obj) + self.executor.submit(update_health, obj, srcaddr) + self.executor.submit(update_stats, obj, srcaddr) diff --git a/octavia/common/constants.py b/octavia/common/constants.py index fed20fb014..b60579c240 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -400,6 +400,8 @@ MAX_QUOTA = 2000000000 API_VERSION = '0.5' +NOOP_EVENT_STREAMER = 'noop_event_streamer' + HAPROXY_BASE_PEER_PORT = 1025 KEEPALIVED_JINJA2_UPSTART = 'keepalived.upstart.j2' KEEPALIVED_JINJA2_SYSTEMD = 'keepalived.systemd.j2' diff --git a/octavia/controller/healthmanager/health_drivers/update_base.py b/octavia/controller/healthmanager/health_drivers/update_base.py index b9007b6783..24056deb68 100644 --- a/octavia/controller/healthmanager/health_drivers/update_base.py +++ b/octavia/controller/healthmanager/health_drivers/update_base.py @@ -17,11 +17,11 @@ import abc class HealthUpdateBase(object): @abc.abstractmethod - def update_health(self, health): + def update_health(self, health, srcaddr): raise NotImplementedError() class StatsUpdateBase(object): @abc.abstractmethod - def update_stats(self, health_message): + def update_stats(self, health_message, srcaddr): raise NotImplementedError() diff --git a/octavia/controller/healthmanager/health_drivers/update_db.py b/octavia/controller/healthmanager/health_drivers/update_db.py index c17cc35e91..7fb7d62a76 100644 --- a/octavia/controller/healthmanager/health_drivers/update_db.py +++ b/octavia/controller/healthmanager/health_drivers/update_db.py @@ -68,22 +68,24 @@ class UpdateHealthDb(update_base.HealthUpdateBase): message.update({constants.OPERATING_STATUS: new_op_status}) if self.sync_prv_status: LOG.debug("%s %s provisioning_status %s. " - "Updating db and sending event.", + "Sending event.", entity_type, entity_id, current_prov_status) message.update( {constants.PROVISIONING_STATUS: current_prov_status}) if message: self.emit(entity_type, entity_id, message) - def update_health(self, health): + def update_health(self, health, srcaddr): # The executor will eat any exceptions from the update_health code # so we need to wrap it and log the unhandled exception try: - self._update_health(health) + self._update_health(health, srcaddr) except Exception: - LOG.exception('update_health encountered an unknown error') + LOG.exception('update_health encountered an unknown error ' + 'processing health message for amphora {0} with IP ' + '{1}'.format(health['id'], srcaddr)) - def _update_health(self, health): + def _update_health(self, health, srcaddr): """This function is to update db info based on amphora status :param health: map object that contains amphora, listener, member info @@ -110,19 +112,30 @@ class UpdateHealthDb(update_base.HealthUpdateBase): session = db_api.get_session() # We need to see if all of the listeners are reporting in - expected_listener_count = 0 - db_lbs_on_amp = self.amphora_repo.get_all_lbs_on_amphora(session, - health['id']) + db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id']) ignore_listener_count = False listeners = health['listeners'] + if not db_lb: + # If this is not a spare amp, log and skip it. + amp = self.amphora_repo.get(session, id=health['id']) + if not amp or amp.load_balancer_id: + # This is debug and not warning because this can happen under + # normal deleting operations. + LOG.debug('Received a health heartbeat from amphora {0} with ' + 'IP {1} that should not exist. This amphora may be ' + 'in the process of being deleted, in which case you ' + 'will only see this message a few times. However if ' + 'it is repeating this amphora should be manually ' + 'deleted.'.format(health['id'], srcaddr)) + return + # We need to loop over the lbs here to make sure we update the # amphora_health record as soon as possible to prevent unnecessary # failovers. Unfortunately that means looping over this list twice. - for db_lb in db_lbs_on_amp: - expected_listener_count += len(db_lb.listeners) - if 'PENDING' in db_lb.provisioning_status: - ignore_listener_count = True + expected_listener_count = len(db_lb.listeners) + if 'PENDING' in db_lb.provisioning_status: + ignore_listener_count = True # Do not update amphora health if the reporting listener count # does not match the expected listener count @@ -157,78 +170,76 @@ class UpdateHealthDb(update_base.HealthUpdateBase): {'id': health['id'], 'found': len(listeners), 'expected': expected_listener_count}) - for db_lb in db_lbs_on_amp: + processed_pools = [] - processed_pools = [] + # We got a heartbeat so lb is healthy until proven otherwise + if db_lb.enabled is False: + lb_status = constants.OFFLINE + else: + lb_status = constants.ONLINE - # We got a heartbeat so lb is healthy until proven otherwise - if db_lb.enabled is False: - lb_status = constants.OFFLINE + for db_listener in db_lb.listeners: + listener_status = None + listener_id = db_listener.id + listener = None + + if listener_id not in listeners: + listener_status = constants.OFFLINE else: - lb_status = constants.ONLINE + listener = listeners[listener_id] - for db_listener in db_lb.listeners: - listener_status = None - listener_id = db_listener.id - listener = None - - if listener_id not in listeners: - listener_status = constants.OFFLINE + # OPEN = HAProxy listener status nbconn < maxconn + if listener.get('status') == constants.OPEN: + listener_status = constants.ONLINE + # FULL = HAProxy listener status not nbconn < maxconn + elif listener.get('status') == constants.FULL: + listener_status = constants.DEGRADED + if lb_status == constants.ONLINE: + lb_status = constants.DEGRADED else: - listener = listeners[listener_id] + LOG.warning(('Listener %(list)s reported status of ' + '%(status)s'), + {'list': listener_id, + 'status': listener.get('status')}) - # OPEN = HAProxy listener status nbconn < maxconn - if listener.get('status') == constants.OPEN: - listener_status = constants.ONLINE - # FULL = HAProxy listener status not nbconn < maxconn - elif listener.get('status') == constants.FULL: - listener_status = constants.DEGRADED - if lb_status == constants.ONLINE: - lb_status = constants.DEGRADED - else: - LOG.warning(('Listener %(list)s reported status of ' - '%(status)s'), - {'list': listener_id, - 'status': listener.get('status')}) - - try: - if listener_status is not None: - self._update_status_and_emit_event( - session, self.listener_repo, constants.LISTENER, - listener_id, listener_status, - db_listener.operating_status, - db_listener.provisioning_status - ) - except sqlalchemy.orm.exc.NoResultFound: - LOG.error("Listener %s is not in DB", listener_id) - - if not listener: - continue - - pools = listener['pools'] - - # Process pools bound to listeners - for db_pool in db_listener.pools: - lb_status = self._process_pool_status( - session, db_pool, pools, lb_status, processed_pools) - - # Process pools bound to the load balancer - for db_pool in db_lb.pools: - # Don't re-process pools shared with listeners - if db_pool.id in processed_pools: - continue - lb_status = self._process_pool_status( - session, db_pool, [], lb_status, processed_pools) - - # Update the load balancer status last try: - self._update_status_and_emit_event( - session, self.loadbalancer_repo, - constants.LOADBALANCER, db_lb.id, lb_status, - db_lb.operating_status, db_lb.provisioning_status - ) + if listener_status is not None: + self._update_status_and_emit_event( + session, self.listener_repo, constants.LISTENER, + listener_id, listener_status, + db_listener.operating_status, + db_listener.provisioning_status + ) except sqlalchemy.orm.exc.NoResultFound: - LOG.error("Load balancer %s is not in DB", db_lb.id) + LOG.error("Listener %s is not in DB", listener_id) + + if not listener: + continue + + pools = listener['pools'] + + # Process pools bound to listeners + for db_pool in db_listener.pools: + lb_status = self._process_pool_status( + session, db_pool, pools, lb_status, processed_pools) + + # Process pools bound to the load balancer + for db_pool in db_lb.pools: + # Don't re-process pools shared with listeners + if db_pool.id in processed_pools: + continue + lb_status = self._process_pool_status( + session, db_pool, [], lb_status, processed_pools) + + # Update the load balancer status last + try: + self._update_status_and_emit_event( + session, self.loadbalancer_repo, + constants.LOADBALANCER, db_lb.id, lb_status, + db_lb.operating_status, db_lb.provisioning_status + ) + except sqlalchemy.orm.exc.NoResultFound: + LOG.error("Load balancer %s is not in DB", db_lb.id) LOG.debug('Health Update finished in: {0} seconds'.format( timeit.default_timer() - start_time)) @@ -335,15 +346,17 @@ class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin): cnt = update_serializer.InfoContainer(info_type, info_id, info_obj) self.event_streamer.emit(cnt) - def update_stats(self, health_message): + def update_stats(self, health_message, srcaddr): # The executor will eat any exceptions from the update_stats code # so we need to wrap it and log the unhandled exception try: - self._update_stats(health_message) + self._update_stats(health_message, srcaddr) except Exception: - LOG.exception('update_stats encountered an unknown error') + LOG.exception('update_stats encountered an unknown error ' + 'processing stats for amphora {0} with IP ' + '{1}'.format(health_message['id'], srcaddr)) - def _update_stats(self, health_message): + def _update_stats(self, health_message, srcaddr): """This function is to update the db with listener stats :param health_message: The health message containing the listener stats @@ -392,12 +405,21 @@ class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin): self.listener_stats_repo.replace( session, listener_id, amphora_id, **stats) - listener_stats = self.get_listener_stats(session, listener_id) - self.emit( - 'listener_stats', listener_id, listener_stats.get_stats()) + if (CONF.health_manager.event_streamer_driver != + constants.NOOP_EVENT_STREAMER): + listener_stats = self.get_listener_stats(session, listener_id) + self.emit( + 'listener_stats', listener_id, listener_stats.get_stats()) - listener_db = self.repo_listener.get(session, id=listener_id) - lb_stats = self.get_loadbalancer_stats( - session, listener_db.load_balancer_id) - self.emit('loadbalancer_stats', - listener_db.load_balancer_id, lb_stats.get_stats()) + listener_db = self.repo_listener.get(session, id=listener_id) + if not listener_db: + LOG.debug('Received health stats for a non-existent ' + 'listener {0} for amphora {1} with IP ' + '{2}.'.format(listener_id, amphora_id, + srcaddr)) + return + + lb_stats = self.get_loadbalancer_stats( + session, listener_db.load_balancer_id) + self.emit('loadbalancer_stats', + listener_db.load_balancer_id, lb_stats.get_stats()) diff --git a/octavia/controller/healthmanager/health_drivers/update_logging.py b/octavia/controller/healthmanager/health_drivers/update_logging.py index f5ada76d95..156657cd9f 100644 --- a/octavia/controller/healthmanager/health_drivers/update_logging.py +++ b/octavia/controller/healthmanager/health_drivers/update_logging.py @@ -20,10 +20,10 @@ LOG = logging.getLogger(__name__) class HealthUpdateLogger(update_base.HealthUpdateBase): - def update_health(self, health): + def update_health(self, health, srcaddr): LOG.info("Health update triggered for: %s", health.get('id')) class StatsUpdateLogger(update_base.StatsUpdateBase): - def update_stats(self, health_message): + def update_stats(self, health_message, srcaddr): LOG.info("Stats update triggered for: %s", health_message.get('id')) diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index ca61a5aa7f..db16ab293a 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -748,11 +748,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): # if we run with anti-affinity we need to set the server group # as well if CONF.nova.enable_anti_affinity: - lb = self._amphora_repo.get_all_lbs_on_amphora( + lb = self._amphora_repo.get_lb_for_amphora( db_apis.get_session(), amp.id) if lb: - stored_params[constants.SERVER_GROUP_ID] = ( - lb[0].server_group_id) + stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id failover_amphora_tf = self._taskflow_load( self._amphora_flows.get_failover_flow( diff --git a/octavia/controller/worker/tasks/controller_tasks.py b/octavia/controller/worker/tasks/controller_tasks.py deleted file mode 100644 index deac28e68e..0000000000 --- a/octavia/controller/worker/tasks/controller_tasks.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from taskflow import task - -from octavia.db import api as db_apis -from octavia.db import repositories as repo - - -class BaseControllerTask(task.Task): - """Base task to load drivers common to the tasks.""" - - def __init__(self, **kwargs): - from octavia.controller.worker import controller_worker - self.cntrlr_worker = controller_worker.ControllerWorker() - self.listener_repo = repo.ListenerRepository() - self.amp_repo = repo.AmphoraRepository() - self.pool_repo = repo.PoolRepository() - super(BaseControllerTask, self).__init__(**kwargs) - - -# todo(xgerman): Make sure this is used outside tests -class DeleteLoadBalancersOnAmp(BaseControllerTask): - """Delete the load balancers on an amphora.""" - - def execute(self, amphora): - """Deletes the load balancers on an amphora. - - Iterate across the load balancers on an amphora and - call back into the controller worker to delete the - load balancers. - - :param amphora: The amphora to delete the load balancers from - """ - lbs = self.amp_repo.get_all_lbs_on_amphora(db_apis.get_session(), - amphora_id=amphora.id) - for lb in lbs: - self.cntrlr_worker.delete_load_balancer(lb.id) diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index 24853902f9..f856354649 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -938,21 +938,33 @@ class AmphoraRepository(BaseRepository): return amp.to_data_model() - def get_all_lbs_on_amphora(self, session, amphora_id): + @staticmethod + def get_lb_for_amphora(session, amphora_id): """Get all of the load balancers on an amphora. :param session: A Sql Alchemy database session. :param amphora_id: The amphora id to list the load balancers from :returns: [octavia.common.data_model] """ - with session.begin(subtransactions=True): - lb_subquery = (session.query(self.model_class.load_balancer_id). - filter_by(id=amphora_id).subquery()) - lb_list = (session.query(models.LoadBalancer). - filter(models.LoadBalancer.id.in_(lb_subquery)). - options(joinedload('*')).all()) - data_model_list = [model.to_data_model() for model in lb_list] - return data_model_list + with session.begin(): + db_lb = ( + # Get LB records + session.query(models.LoadBalancer) + # Joined to amphora records + .filter(models.LoadBalancer.id == + models.Amphora.load_balancer_id) + # For just this amphora + .filter(models.Amphora.id == amphora_id) + # Where the amphora is not DELETED + .filter(models.Amphora.status != consts.DELETED) + # And the LB is also not DELETED + .filter(models.LoadBalancer.provisioning_status != + consts.DELETED) + # And what does this do? Some SQLAlchemy magic? + .options(joinedload('*')) + ).first() + if db_lb: + return db_lb.to_data_model() def get_all_deleted_expiring_amphora(self, session, exp_age=None): diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index ddaab2c05b..301c98609c 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -3032,13 +3032,12 @@ class AmphoraRepositoryTest(BaseRepositoryTest): self.assertIsNotNone(new_amphora) self.assertIsInstance(new_amphora, models.Amphora) - def test_get_all_lbs_on_amphora(self): + def test_get_lb_for_amphora(self): amphora = self.create_amphora(self.FAKE_UUID_1) self.amphora_repo.associate(self.session, self.lb.id, amphora.id) - lb_list = self.amphora_repo.get_all_lbs_on_amphora(self.session, - amphora.id) - self.assertIsNotNone(lb_list) - self.assertIn(self.lb, lb_list) + lb = self.amphora_repo.get_lb_for_amphora(self.session, amphora.id) + self.assertIsNotNone(lb) + self.assertEqual(self.lb, lb) def get_all_deleted_expiring_amphora(self): exp_age = datetime.timedelta(seconds=self.FAKE_EXP_AGE) diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py index 2aafd65f2b..f58b27c551 100644 --- a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py +++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py @@ -62,24 +62,26 @@ class TestHeartbeatUDP(base.TestCase): @mock.patch('stevedore.driver.DriverManager') def test_update_health_func(self, driver_manager): obj = {'id': 1} - heartbeat_udp.update_health(obj) + heartbeat_udp.update_health(obj, '192.0.2.1') driver_manager.assert_called_once_with( invoke_on_load=True, name='health_logger', namespace='octavia.amphora.health_update_drivers' ) - driver_manager().driver.update_health.assert_called_once_with(obj) + driver_manager().driver.update_health.assert_called_once_with( + obj, '192.0.2.1') @mock.patch('stevedore.driver.DriverManager') def test_update_stats_func(self, driver_manager): obj = {'id': 1} - heartbeat_udp.update_stats(obj) + heartbeat_udp.update_stats(obj, '192.0.2.1') driver_manager.assert_called_once_with( invoke_on_load=True, name='stats_logger', namespace='octavia.amphora.stats_update_drivers' ) - driver_manager().driver.update_stats.assert_called_once_with(obj) + driver_manager().driver.update_stats.assert_called_once_with( + obj, '192.0.2.1') @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') @@ -117,9 +119,9 @@ class TestHeartbeatUDP(base.TestCase): '1aa050041b506245806e5c1971e79951818394e' 'a6e71ad989ff950945f9573f4ab6f83e25db8ed7') bin_msg = binascii.unhexlify(sample_msg) - recvfrom.return_value = bin_msg, 2 + recvfrom.return_value = bin_msg, ('192.0.2.1', 2) (obj, srcaddr) = getter.dorecv() - self.assertEqual(2, srcaddr) + self.assertEqual('192.0.2.1', srcaddr) self.assertIsNotNone(obj.pop('recv_time')) self.assertEqual({"testkey": "TEST"}, obj) @@ -163,8 +165,8 @@ class TestHeartbeatUDP(base.TestCase): getter.check() getter.executor.shutdown() mock_executor.submit.assert_has_calls( - [mock.call(heartbeat_udp.update_health, {'id': 1}), - mock.call(heartbeat_udp.update_stats, {'id': 1})]) + [mock.call(heartbeat_udp.update_health, {'id': 1}, 2), + mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)]) @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py index 1fa01e469e..d6afda28bd 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py @@ -25,7 +25,7 @@ class TestHealthUpdateBase(base.TestCase): def test_update_health(self): self.assertRaises(NotImplementedError, - self.logger.update_health, {'id': 1}) + self.logger.update_health, {'id': 1}, '192.0.2.1') class TestStatsUpdateBase(base.TestCase): @@ -35,4 +35,4 @@ class TestStatsUpdateBase(base.TestCase): def test_update_stats(self): self.assertRaises(NotImplementedError, - self.logger.update_stats, {'id': 1}) + self.logger.update_stats, {'id': 1}, '192.0.2.1') diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py index 291867b00e..ab18bcb520 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py @@ -66,7 +66,7 @@ class TestUpdateHealthDb(base.TestCase): self.hm.amphora_repo = self.amphora_repo fake_lb = mock.MagicMock() - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [fake_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb self.hm.amphora_health_repo = self.amphora_health_repo self.hm.listener_repo = self.listener_repo self.hm.listener_repo.count.return_value = 1 @@ -132,9 +132,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_any_call( {}, 'update_info', container={ 'info_type': 'listener', 'info_id': 'listener-id-1', @@ -158,10 +158,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -176,10 +176,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=True, pool=False, lb_prov_status=constants.PENDING_UPDATE)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -193,10 +193,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=True, pool=False)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertFalse(self.amphora_health_repo.replace.called) @@ -210,10 +210,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) # Receive time is stale, so we shouldn't see this called self.assertFalse(self.loadbalancer_repo.update.called) @@ -221,15 +221,25 @@ class TestUpdateHealthDb(base.TestCase): health = { "id": self.FAKE_UUID_1, - "listeners": {}, + "listeners": { + "listener-id-1": {"status": constants.OPEN, "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.UP} + } + } + } + }, "recv_time": time.time() } self.session_mock.commit.side_effect = TestException('boom') - self.amphora_repo.get_all_lbs_on_amphora.return_value = [] + mock_lb, mock_listener1, mock_pool1, mock_member1 = ( + self._make_mock_lb_tree()) - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) self.session_mock.rollback.assert_called_once() @@ -250,8 +260,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -281,7 +291,7 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.listeners = [mock_listener1, mock_listener2] self.amphora_health_repo.replace.reset_mock() - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(not self.amphora_health_repo.replace.called) def test_update_lb_pool_health_offline(self): @@ -297,8 +307,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -344,8 +354,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.listeners.append(mock_listener2) mock_lb.pools.append(mock_pool2) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -381,8 +391,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -423,8 +433,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -454,8 +464,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -494,8 +504,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -534,8 +544,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -570,9 +580,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -613,9 +623,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(health_monitor=False)) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -653,9 +663,9 @@ class TestUpdateHealthDb(base.TestCase): self._make_mock_lb_tree(health_monitor=False)) mock_members[0].admin_state_up = False mock_members[0].operating_status = constants.NO_MONITOR - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -694,8 +704,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -735,9 +745,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(members=2)) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) # test listener, member for listener_id, listener in six.iteritems( @@ -780,8 +790,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -811,7 +821,7 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.listeners.append(mock_listener2) self.amphora_health_repo.replace.reset_mock() - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(not self.amphora_health_repo.replace.called) def test_update_health_error(self): @@ -831,9 +841,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -922,9 +932,9 @@ class TestUpdateHealthDb(base.TestCase): mock_listener.pools = [mock_pool] mock_lb.listeners.append(mock_listener) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') # test listener self.listener_repo.update.assert_any_call( @@ -973,9 +983,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -1024,9 +1034,9 @@ class TestUpdateHealthDb(base.TestCase): mock_pool1.operating_status = constants.ONLINE mock_listener1.operating_status = constants.ONLINE mock_lb.operating_status = constants.ONLINE - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_not_called() self.loadbalancer_repo.update.assert_not_called() self.listener_repo.update.assert_not_called() @@ -1042,10 +1052,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) mock_lb.enabled = False - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( self.mock_session(), mock_lb.id, @@ -1060,10 +1070,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) mock_lb.enabled = True - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( self.mock_session(), mock_lb.id, @@ -1146,7 +1156,7 @@ class TestUpdateStatsDb(base.TestCase): session.return_value = 'blah' - self.sm.update_stats(health) + self.sm.update_stats(health, '192.0.2.1') self.listener_stats_repo.replace.assert_called_once_with( 'blah', self.listener_id, self.amphora_id, diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py index 693efc435f..b0669cc6eb 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py @@ -28,7 +28,7 @@ class TestHealthUpdateLogger(base.TestCase): @mock.patch('octavia.controller.healthmanager.health_drivers' '.update_logging.LOG') def test_update_health(self, mock_log): - self.logger.update_health({'id': 1}) + self.logger.update_health({'id': 1}, '192.0.2.1') self.assertEqual(1, mock_log.info.call_count) @@ -40,5 +40,5 @@ class TestStatsUpdateLogger(base.TestCase): @mock.patch('octavia.controller.healthmanager.health_drivers' '.update_logging.LOG') def test_update_stats(self, mock_log): - self.logger.update_stats({'id': 1}) + self.logger.update_stats({'id': 1}, '192.0.2.1') self.assertEqual(1, mock_log.info.call_count) diff --git a/octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py deleted file mode 100644 index 3ffdaffaff..0000000000 --- a/octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from oslo_utils import uuidutils - -from octavia.controller.worker import controller_worker -from octavia.controller.worker.tasks import controller_tasks -from octavia.db import repositories as repo -import octavia.tests.unit.base as base - - -AMP_ID = uuidutils.generate_uuid() -LB1_ID = uuidutils.generate_uuid() -LB2_ID = uuidutils.generate_uuid() -LISTENER1_ID = uuidutils.generate_uuid() -LISTENER2_ID = uuidutils.generate_uuid() - -_lb1_mock = mock.MagicMock() -_lb1_mock.id = LB1_ID -_lb2_mock = mock.MagicMock() -_lb2_mock.id = LB2_ID -_lbs = [_lb1_mock, _lb2_mock] - -_listener1_mock = mock.MagicMock() -_listener1_mock.id = LISTENER1_ID -_listener1_mock.enabled = False -_listener2_mock = mock.MagicMock() -_listener2_mock.id = LISTENER2_ID -_listener2_mock.enabled = True -_listeners = [_listener1_mock, _listener2_mock] - - -@mock.patch('octavia.db.api.get_session', return_value='TEST') -class TestControllerTasks(base.TestCase): - - def setUp(self): - - self.amphora_mock = mock.MagicMock() - self.amphora_mock.id = AMP_ID - - self.loadbalancer_mock = mock.MagicMock() - self.loadbalancer_mock.id = LB1_ID - self.loadbalancer_mock.enabled = True - - super(TestControllerTasks, self).setUp() - - @mock.patch('octavia.controller.worker.controller_worker.' - 'ControllerWorker.delete_load_balancer') - @mock.patch('octavia.db.repositories.AmphoraRepository.' - 'get_all_lbs_on_amphora', - return_value=_lbs) - def test_delete_load_balancers_on_amp(self, - mock_get_all_lbs_on_amp, - mock_delete_lb, - mock_get_session): - - delete_lbs_on_amp = controller_tasks.DeleteLoadBalancersOnAmp() - delete_lbs_on_amp.execute(self.amphora_mock) - - repo.AmphoraRepository.get_all_lbs_on_amphora.assert_called_once_with( - 'TEST', - amphora_id=AMP_ID) - - (controller_worker. - ControllerWorker.delete_load_balancer.assert_has_calls)([ - mock.call(LB1_ID), - mock.call(LB2_ID)], any_order=False) diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index fdb7e68eab..c8b5ab00d7 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -1228,13 +1228,13 @@ class TestControllerWorker(base.TestCase): 'amphora_flows.AmphoraFlows.get_failover_flow', return_value=_flow_mock) @mock.patch( - 'octavia.db.repositories.AmphoraRepository.get_all_lbs_on_amphora', - return_value=[_load_balancer_mock]) + 'octavia.db.repositories.AmphoraRepository.get_lb_for_amphora', + return_value=_load_balancer_mock) @mock.patch('octavia.db.repositories.LoadBalancerRepository.update') def test_failover_amphora_anti_affinity(self, mock_update, mock_get_update_listener_flow, - mock_get_all_lbs_for_amp_mock, + mock_get_lb_for_amphora, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load,