diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py index 89e4d973ff..c4b07cdbf3 100644 --- a/octavia/amphorae/drivers/health/heartbeat_udp.py +++ b/octavia/amphorae/drivers/health/heartbeat_udp.py @@ -29,22 +29,22 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) -def update_health(obj): +def update_health(obj, srcaddr): handler = stevedore_driver.DriverManager( namespace='octavia.amphora.health_update_drivers', name=CONF.health_manager.health_update_driver, invoke_on_load=True ).driver - handler.update_health(obj) + handler.update_health(obj, srcaddr) -def update_stats(obj): +def update_stats(obj, srcaddr): handler = stevedore_driver.DriverManager( namespace='octavia.amphora.stats_update_drivers', name=CONF.health_manager.stats_update_driver, invoke_on_load=True ).driver - handler.update_stats(obj) + handler.update_stats(obj, srcaddr) class UDPStatusGetter(object): @@ -193,7 +193,7 @@ class UDPStatusGetter(object): 'Exception: %s', srcaddr, e) raise exceptions.InvalidHMACException() obj['recv_time'] = time.time() - return obj, srcaddr + return obj, srcaddr[0] def check(self): try: @@ -209,5 +209,5 @@ class UDPStatusGetter(object): 'heartbeat packet. Ignoring this packet. ' 'Exception: %s', e) else: - self.executor.submit(update_health, obj) - self.executor.submit(update_stats, obj) + self.executor.submit(update_health, obj, srcaddr) + self.executor.submit(update_stats, obj, srcaddr) diff --git a/octavia/common/constants.py b/octavia/common/constants.py index fed20fb014..b60579c240 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -400,6 +400,8 @@ MAX_QUOTA = 2000000000 API_VERSION = '0.5' +NOOP_EVENT_STREAMER = 'noop_event_streamer' + HAPROXY_BASE_PEER_PORT = 1025 KEEPALIVED_JINJA2_UPSTART = 'keepalived.upstart.j2' KEEPALIVED_JINJA2_SYSTEMD = 'keepalived.systemd.j2' diff --git a/octavia/controller/healthmanager/health_drivers/update_base.py b/octavia/controller/healthmanager/health_drivers/update_base.py index b9007b6783..24056deb68 100644 --- a/octavia/controller/healthmanager/health_drivers/update_base.py +++ b/octavia/controller/healthmanager/health_drivers/update_base.py @@ -17,11 +17,11 @@ import abc class HealthUpdateBase(object): @abc.abstractmethod - def update_health(self, health): + def update_health(self, health, srcaddr): raise NotImplementedError() class StatsUpdateBase(object): @abc.abstractmethod - def update_stats(self, health_message): + def update_stats(self, health_message, srcaddr): raise NotImplementedError() diff --git a/octavia/controller/healthmanager/health_drivers/update_db.py b/octavia/controller/healthmanager/health_drivers/update_db.py index c17cc35e91..7fb7d62a76 100644 --- a/octavia/controller/healthmanager/health_drivers/update_db.py +++ b/octavia/controller/healthmanager/health_drivers/update_db.py @@ -68,22 +68,24 @@ class UpdateHealthDb(update_base.HealthUpdateBase): message.update({constants.OPERATING_STATUS: new_op_status}) if self.sync_prv_status: LOG.debug("%s %s provisioning_status %s. " - "Updating db and sending event.", + "Sending event.", entity_type, entity_id, current_prov_status) message.update( {constants.PROVISIONING_STATUS: current_prov_status}) if message: self.emit(entity_type, entity_id, message) - def update_health(self, health): + def update_health(self, health, srcaddr): # The executor will eat any exceptions from the update_health code # so we need to wrap it and log the unhandled exception try: - self._update_health(health) + self._update_health(health, srcaddr) except Exception: - LOG.exception('update_health encountered an unknown error') + LOG.exception('update_health encountered an unknown error ' + 'processing health message for amphora {0} with IP ' + '{1}'.format(health['id'], srcaddr)) - def _update_health(self, health): + def _update_health(self, health, srcaddr): """This function is to update db info based on amphora status :param health: map object that contains amphora, listener, member info @@ -110,19 +112,30 @@ class UpdateHealthDb(update_base.HealthUpdateBase): session = db_api.get_session() # We need to see if all of the listeners are reporting in - expected_listener_count = 0 - db_lbs_on_amp = self.amphora_repo.get_all_lbs_on_amphora(session, - health['id']) + db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id']) ignore_listener_count = False listeners = health['listeners'] + if not db_lb: + # If this is not a spare amp, log and skip it. + amp = self.amphora_repo.get(session, id=health['id']) + if not amp or amp.load_balancer_id: + # This is debug and not warning because this can happen under + # normal deleting operations. + LOG.debug('Received a health heartbeat from amphora {0} with ' + 'IP {1} that should not exist. This amphora may be ' + 'in the process of being deleted, in which case you ' + 'will only see this message a few times. However if ' + 'it is repeating this amphora should be manually ' + 'deleted.'.format(health['id'], srcaddr)) + return + # We need to loop over the lbs here to make sure we update the # amphora_health record as soon as possible to prevent unnecessary # failovers. Unfortunately that means looping over this list twice. - for db_lb in db_lbs_on_amp: - expected_listener_count += len(db_lb.listeners) - if 'PENDING' in db_lb.provisioning_status: - ignore_listener_count = True + expected_listener_count = len(db_lb.listeners) + if 'PENDING' in db_lb.provisioning_status: + ignore_listener_count = True # Do not update amphora health if the reporting listener count # does not match the expected listener count @@ -157,78 +170,76 @@ class UpdateHealthDb(update_base.HealthUpdateBase): {'id': health['id'], 'found': len(listeners), 'expected': expected_listener_count}) - for db_lb in db_lbs_on_amp: + processed_pools = [] - processed_pools = [] + # We got a heartbeat so lb is healthy until proven otherwise + if db_lb.enabled is False: + lb_status = constants.OFFLINE + else: + lb_status = constants.ONLINE - # We got a heartbeat so lb is healthy until proven otherwise - if db_lb.enabled is False: - lb_status = constants.OFFLINE + for db_listener in db_lb.listeners: + listener_status = None + listener_id = db_listener.id + listener = None + + if listener_id not in listeners: + listener_status = constants.OFFLINE else: - lb_status = constants.ONLINE + listener = listeners[listener_id] - for db_listener in db_lb.listeners: - listener_status = None - listener_id = db_listener.id - listener = None - - if listener_id not in listeners: - listener_status = constants.OFFLINE + # OPEN = HAProxy listener status nbconn < maxconn + if listener.get('status') == constants.OPEN: + listener_status = constants.ONLINE + # FULL = HAProxy listener status not nbconn < maxconn + elif listener.get('status') == constants.FULL: + listener_status = constants.DEGRADED + if lb_status == constants.ONLINE: + lb_status = constants.DEGRADED else: - listener = listeners[listener_id] + LOG.warning(('Listener %(list)s reported status of ' + '%(status)s'), + {'list': listener_id, + 'status': listener.get('status')}) - # OPEN = HAProxy listener status nbconn < maxconn - if listener.get('status') == constants.OPEN: - listener_status = constants.ONLINE - # FULL = HAProxy listener status not nbconn < maxconn - elif listener.get('status') == constants.FULL: - listener_status = constants.DEGRADED - if lb_status == constants.ONLINE: - lb_status = constants.DEGRADED - else: - LOG.warning(('Listener %(list)s reported status of ' - '%(status)s'), - {'list': listener_id, - 'status': listener.get('status')}) - - try: - if listener_status is not None: - self._update_status_and_emit_event( - session, self.listener_repo, constants.LISTENER, - listener_id, listener_status, - db_listener.operating_status, - db_listener.provisioning_status - ) - except sqlalchemy.orm.exc.NoResultFound: - LOG.error("Listener %s is not in DB", listener_id) - - if not listener: - continue - - pools = listener['pools'] - - # Process pools bound to listeners - for db_pool in db_listener.pools: - lb_status = self._process_pool_status( - session, db_pool, pools, lb_status, processed_pools) - - # Process pools bound to the load balancer - for db_pool in db_lb.pools: - # Don't re-process pools shared with listeners - if db_pool.id in processed_pools: - continue - lb_status = self._process_pool_status( - session, db_pool, [], lb_status, processed_pools) - - # Update the load balancer status last try: - self._update_status_and_emit_event( - session, self.loadbalancer_repo, - constants.LOADBALANCER, db_lb.id, lb_status, - db_lb.operating_status, db_lb.provisioning_status - ) + if listener_status is not None: + self._update_status_and_emit_event( + session, self.listener_repo, constants.LISTENER, + listener_id, listener_status, + db_listener.operating_status, + db_listener.provisioning_status + ) except sqlalchemy.orm.exc.NoResultFound: - LOG.error("Load balancer %s is not in DB", db_lb.id) + LOG.error("Listener %s is not in DB", listener_id) + + if not listener: + continue + + pools = listener['pools'] + + # Process pools bound to listeners + for db_pool in db_listener.pools: + lb_status = self._process_pool_status( + session, db_pool, pools, lb_status, processed_pools) + + # Process pools bound to the load balancer + for db_pool in db_lb.pools: + # Don't re-process pools shared with listeners + if db_pool.id in processed_pools: + continue + lb_status = self._process_pool_status( + session, db_pool, [], lb_status, processed_pools) + + # Update the load balancer status last + try: + self._update_status_and_emit_event( + session, self.loadbalancer_repo, + constants.LOADBALANCER, db_lb.id, lb_status, + db_lb.operating_status, db_lb.provisioning_status + ) + except sqlalchemy.orm.exc.NoResultFound: + LOG.error("Load balancer %s is not in DB", db_lb.id) LOG.debug('Health Update finished in: {0} seconds'.format( timeit.default_timer() - start_time)) @@ -335,15 +346,17 @@ class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin): cnt = update_serializer.InfoContainer(info_type, info_id, info_obj) self.event_streamer.emit(cnt) - def update_stats(self, health_message): + def update_stats(self, health_message, srcaddr): # The executor will eat any exceptions from the update_stats code # so we need to wrap it and log the unhandled exception try: - self._update_stats(health_message) + self._update_stats(health_message, srcaddr) except Exception: - LOG.exception('update_stats encountered an unknown error') + LOG.exception('update_stats encountered an unknown error ' + 'processing stats for amphora {0} with IP ' + '{1}'.format(health_message['id'], srcaddr)) - def _update_stats(self, health_message): + def _update_stats(self, health_message, srcaddr): """This function is to update the db with listener stats :param health_message: The health message containing the listener stats @@ -392,12 +405,21 @@ class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin): self.listener_stats_repo.replace( session, listener_id, amphora_id, **stats) - listener_stats = self.get_listener_stats(session, listener_id) - self.emit( - 'listener_stats', listener_id, listener_stats.get_stats()) + if (CONF.health_manager.event_streamer_driver != + constants.NOOP_EVENT_STREAMER): + listener_stats = self.get_listener_stats(session, listener_id) + self.emit( + 'listener_stats', listener_id, listener_stats.get_stats()) - listener_db = self.repo_listener.get(session, id=listener_id) - lb_stats = self.get_loadbalancer_stats( - session, listener_db.load_balancer_id) - self.emit('loadbalancer_stats', - listener_db.load_balancer_id, lb_stats.get_stats()) + listener_db = self.repo_listener.get(session, id=listener_id) + if not listener_db: + LOG.debug('Received health stats for a non-existent ' + 'listener {0} for amphora {1} with IP ' + '{2}.'.format(listener_id, amphora_id, + srcaddr)) + return + + lb_stats = self.get_loadbalancer_stats( + session, listener_db.load_balancer_id) + self.emit('loadbalancer_stats', + listener_db.load_balancer_id, lb_stats.get_stats()) diff --git a/octavia/controller/healthmanager/health_drivers/update_logging.py b/octavia/controller/healthmanager/health_drivers/update_logging.py index f5ada76d95..156657cd9f 100644 --- a/octavia/controller/healthmanager/health_drivers/update_logging.py +++ b/octavia/controller/healthmanager/health_drivers/update_logging.py @@ -20,10 +20,10 @@ LOG = logging.getLogger(__name__) class HealthUpdateLogger(update_base.HealthUpdateBase): - def update_health(self, health): + def update_health(self, health, srcaddr): LOG.info("Health update triggered for: %s", health.get('id')) class StatsUpdateLogger(update_base.StatsUpdateBase): - def update_stats(self, health_message): + def update_stats(self, health_message, srcaddr): LOG.info("Stats update triggered for: %s", health_message.get('id')) diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index ca61a5aa7f..db16ab293a 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -748,11 +748,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): # if we run with anti-affinity we need to set the server group # as well if CONF.nova.enable_anti_affinity: - lb = self._amphora_repo.get_all_lbs_on_amphora( + lb = self._amphora_repo.get_lb_for_amphora( db_apis.get_session(), amp.id) if lb: - stored_params[constants.SERVER_GROUP_ID] = ( - lb[0].server_group_id) + stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id failover_amphora_tf = self._taskflow_load( self._amphora_flows.get_failover_flow( diff --git a/octavia/controller/worker/tasks/controller_tasks.py b/octavia/controller/worker/tasks/controller_tasks.py deleted file mode 100644 index deac28e68e..0000000000 --- a/octavia/controller/worker/tasks/controller_tasks.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from taskflow import task - -from octavia.db import api as db_apis -from octavia.db import repositories as repo - - -class BaseControllerTask(task.Task): - """Base task to load drivers common to the tasks.""" - - def __init__(self, **kwargs): - from octavia.controller.worker import controller_worker - self.cntrlr_worker = controller_worker.ControllerWorker() - self.listener_repo = repo.ListenerRepository() - self.amp_repo = repo.AmphoraRepository() - self.pool_repo = repo.PoolRepository() - super(BaseControllerTask, self).__init__(**kwargs) - - -# todo(xgerman): Make sure this is used outside tests -class DeleteLoadBalancersOnAmp(BaseControllerTask): - """Delete the load balancers on an amphora.""" - - def execute(self, amphora): - """Deletes the load balancers on an amphora. - - Iterate across the load balancers on an amphora and - call back into the controller worker to delete the - load balancers. - - :param amphora: The amphora to delete the load balancers from - """ - lbs = self.amp_repo.get_all_lbs_on_amphora(db_apis.get_session(), - amphora_id=amphora.id) - for lb in lbs: - self.cntrlr_worker.delete_load_balancer(lb.id) diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index 24853902f9..f856354649 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -938,21 +938,33 @@ class AmphoraRepository(BaseRepository): return amp.to_data_model() - def get_all_lbs_on_amphora(self, session, amphora_id): + @staticmethod + def get_lb_for_amphora(session, amphora_id): """Get all of the load balancers on an amphora. :param session: A Sql Alchemy database session. :param amphora_id: The amphora id to list the load balancers from :returns: [octavia.common.data_model] """ - with session.begin(subtransactions=True): - lb_subquery = (session.query(self.model_class.load_balancer_id). - filter_by(id=amphora_id).subquery()) - lb_list = (session.query(models.LoadBalancer). - filter(models.LoadBalancer.id.in_(lb_subquery)). - options(joinedload('*')).all()) - data_model_list = [model.to_data_model() for model in lb_list] - return data_model_list + with session.begin(): + db_lb = ( + # Get LB records + session.query(models.LoadBalancer) + # Joined to amphora records + .filter(models.LoadBalancer.id == + models.Amphora.load_balancer_id) + # For just this amphora + .filter(models.Amphora.id == amphora_id) + # Where the amphora is not DELETED + .filter(models.Amphora.status != consts.DELETED) + # And the LB is also not DELETED + .filter(models.LoadBalancer.provisioning_status != + consts.DELETED) + # And what does this do? Some SQLAlchemy magic? + .options(joinedload('*')) + ).first() + if db_lb: + return db_lb.to_data_model() def get_all_deleted_expiring_amphora(self, session, exp_age=None): diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index ddaab2c05b..301c98609c 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -3032,13 +3032,12 @@ class AmphoraRepositoryTest(BaseRepositoryTest): self.assertIsNotNone(new_amphora) self.assertIsInstance(new_amphora, models.Amphora) - def test_get_all_lbs_on_amphora(self): + def test_get_lb_for_amphora(self): amphora = self.create_amphora(self.FAKE_UUID_1) self.amphora_repo.associate(self.session, self.lb.id, amphora.id) - lb_list = self.amphora_repo.get_all_lbs_on_amphora(self.session, - amphora.id) - self.assertIsNotNone(lb_list) - self.assertIn(self.lb, lb_list) + lb = self.amphora_repo.get_lb_for_amphora(self.session, amphora.id) + self.assertIsNotNone(lb) + self.assertEqual(self.lb, lb) def get_all_deleted_expiring_amphora(self): exp_age = datetime.timedelta(seconds=self.FAKE_EXP_AGE) diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py index 2aafd65f2b..f58b27c551 100644 --- a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py +++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py @@ -62,24 +62,26 @@ class TestHeartbeatUDP(base.TestCase): @mock.patch('stevedore.driver.DriverManager') def test_update_health_func(self, driver_manager): obj = {'id': 1} - heartbeat_udp.update_health(obj) + heartbeat_udp.update_health(obj, '192.0.2.1') driver_manager.assert_called_once_with( invoke_on_load=True, name='health_logger', namespace='octavia.amphora.health_update_drivers' ) - driver_manager().driver.update_health.assert_called_once_with(obj) + driver_manager().driver.update_health.assert_called_once_with( + obj, '192.0.2.1') @mock.patch('stevedore.driver.DriverManager') def test_update_stats_func(self, driver_manager): obj = {'id': 1} - heartbeat_udp.update_stats(obj) + heartbeat_udp.update_stats(obj, '192.0.2.1') driver_manager.assert_called_once_with( invoke_on_load=True, name='stats_logger', namespace='octavia.amphora.stats_update_drivers' ) - driver_manager().driver.update_stats.assert_called_once_with(obj) + driver_manager().driver.update_stats.assert_called_once_with( + obj, '192.0.2.1') @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') @@ -117,9 +119,9 @@ class TestHeartbeatUDP(base.TestCase): '1aa050041b506245806e5c1971e79951818394e' 'a6e71ad989ff950945f9573f4ab6f83e25db8ed7') bin_msg = binascii.unhexlify(sample_msg) - recvfrom.return_value = bin_msg, 2 + recvfrom.return_value = bin_msg, ('192.0.2.1', 2) (obj, srcaddr) = getter.dorecv() - self.assertEqual(2, srcaddr) + self.assertEqual('192.0.2.1', srcaddr) self.assertIsNotNone(obj.pop('recv_time')) self.assertEqual({"testkey": "TEST"}, obj) @@ -163,8 +165,8 @@ class TestHeartbeatUDP(base.TestCase): getter.check() getter.executor.shutdown() mock_executor.submit.assert_has_calls( - [mock.call(heartbeat_udp.update_health, {'id': 1}), - mock.call(heartbeat_udp.update_stats, {'id': 1})]) + [mock.call(heartbeat_udp.update_health, {'id': 1}, 2), + mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)]) @mock.patch('socket.getaddrinfo') @mock.patch('socket.socket') diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py index 1fa01e469e..d6afda28bd 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py @@ -25,7 +25,7 @@ class TestHealthUpdateBase(base.TestCase): def test_update_health(self): self.assertRaises(NotImplementedError, - self.logger.update_health, {'id': 1}) + self.logger.update_health, {'id': 1}, '192.0.2.1') class TestStatsUpdateBase(base.TestCase): @@ -35,4 +35,4 @@ class TestStatsUpdateBase(base.TestCase): def test_update_stats(self): self.assertRaises(NotImplementedError, - self.logger.update_stats, {'id': 1}) + self.logger.update_stats, {'id': 1}, '192.0.2.1') diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py index 291867b00e..ab18bcb520 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py @@ -66,7 +66,7 @@ class TestUpdateHealthDb(base.TestCase): self.hm.amphora_repo = self.amphora_repo fake_lb = mock.MagicMock() - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [fake_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb self.hm.amphora_health_repo = self.amphora_health_repo self.hm.listener_repo = self.listener_repo self.hm.listener_repo.count.return_value = 1 @@ -132,9 +132,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_any_call( {}, 'update_info', container={ 'info_type': 'listener', 'info_id': 'listener-id-1', @@ -158,10 +158,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -176,10 +176,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=True, pool=False, lb_prov_status=constants.PENDING_UPDATE)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -193,10 +193,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=True, pool=False)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertFalse(self.amphora_health_repo.replace.called) @@ -210,10 +210,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) # Receive time is stale, so we shouldn't see this called self.assertFalse(self.loadbalancer_repo.update.called) @@ -221,15 +221,25 @@ class TestUpdateHealthDb(base.TestCase): health = { "id": self.FAKE_UUID_1, - "listeners": {}, + "listeners": { + "listener-id-1": {"status": constants.OPEN, "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.UP} + } + } + } + }, "recv_time": time.time() } self.session_mock.commit.side_effect = TestException('boom') - self.amphora_repo.get_all_lbs_on_amphora.return_value = [] + mock_lb, mock_listener1, mock_pool1, mock_member1 = ( + self._make_mock_lb_tree()) - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) self.session_mock.rollback.assert_called_once() @@ -250,8 +260,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -281,7 +291,7 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.listeners = [mock_listener1, mock_listener2] self.amphora_health_repo.replace.reset_mock() - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(not self.amphora_health_repo.replace.called) def test_update_lb_pool_health_offline(self): @@ -297,8 +307,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -344,8 +354,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.listeners.append(mock_listener2) mock_lb.pools.append(mock_pool2) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -381,8 +391,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -423,8 +433,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -454,8 +464,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -494,8 +504,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -534,8 +544,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -570,9 +580,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -613,9 +623,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(health_monitor=False)) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -653,9 +663,9 @@ class TestUpdateHealthDb(base.TestCase): self._make_mock_lb_tree(health_monitor=False)) mock_members[0].admin_state_up = False mock_members[0].operating_status = constants.NO_MONITOR - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -694,8 +704,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -735,9 +745,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(members=2)) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) # test listener, member for listener_id, listener in six.iteritems( @@ -780,8 +790,8 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] - self.hm.update_health(health) + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -811,7 +821,7 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.listeners.append(mock_listener2) self.amphora_health_repo.replace.reset_mock() - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(not self.amphora_health_repo.replace.called) def test_update_health_error(self): @@ -831,9 +841,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -922,9 +932,9 @@ class TestUpdateHealthDb(base.TestCase): mock_listener.pools = [mock_pool] mock_lb.listeners.append(mock_listener) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') # test listener self.listener_repo.update.assert_any_call( @@ -973,9 +983,9 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree()) - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member @@ -1024,9 +1034,9 @@ class TestUpdateHealthDb(base.TestCase): mock_pool1.operating_status = constants.ONLINE mock_listener1.operating_status = constants.ONLINE mock_lb.operating_status = constants.ONLINE - self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) + self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_not_called() self.loadbalancer_repo.update.assert_not_called() self.listener_repo.update.assert_not_called() @@ -1042,10 +1052,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) mock_lb.enabled = False - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( self.mock_session(), mock_lb.id, @@ -1060,10 +1070,10 @@ class TestUpdateHealthDb(base.TestCase): mock_lb, mock_listener1, mock_pool1, mock_members = ( self._make_mock_lb_tree(listener=False, pool=False)) mock_lb.enabled = True - self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb] + self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb - self.hm.update_health(health) - self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called) + self.hm.update_health(health, '192.0.2.1') + self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( self.mock_session(), mock_lb.id, @@ -1146,7 +1156,7 @@ class TestUpdateStatsDb(base.TestCase): session.return_value = 'blah' - self.sm.update_stats(health) + self.sm.update_stats(health, '192.0.2.1') self.listener_stats_repo.replace.assert_called_once_with( 'blah', self.listener_id, self.amphora_id, diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py index 693efc435f..b0669cc6eb 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py @@ -28,7 +28,7 @@ class TestHealthUpdateLogger(base.TestCase): @mock.patch('octavia.controller.healthmanager.health_drivers' '.update_logging.LOG') def test_update_health(self, mock_log): - self.logger.update_health({'id': 1}) + self.logger.update_health({'id': 1}, '192.0.2.1') self.assertEqual(1, mock_log.info.call_count) @@ -40,5 +40,5 @@ class TestStatsUpdateLogger(base.TestCase): @mock.patch('octavia.controller.healthmanager.health_drivers' '.update_logging.LOG') def test_update_stats(self, mock_log): - self.logger.update_stats({'id': 1}) + self.logger.update_stats({'id': 1}, '192.0.2.1') self.assertEqual(1, mock_log.info.call_count) diff --git a/octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py deleted file mode 100644 index 3ffdaffaff..0000000000 --- a/octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock -from oslo_utils import uuidutils - -from octavia.controller.worker import controller_worker -from octavia.controller.worker.tasks import controller_tasks -from octavia.db import repositories as repo -import octavia.tests.unit.base as base - - -AMP_ID = uuidutils.generate_uuid() -LB1_ID = uuidutils.generate_uuid() -LB2_ID = uuidutils.generate_uuid() -LISTENER1_ID = uuidutils.generate_uuid() -LISTENER2_ID = uuidutils.generate_uuid() - -_lb1_mock = mock.MagicMock() -_lb1_mock.id = LB1_ID -_lb2_mock = mock.MagicMock() -_lb2_mock.id = LB2_ID -_lbs = [_lb1_mock, _lb2_mock] - -_listener1_mock = mock.MagicMock() -_listener1_mock.id = LISTENER1_ID -_listener1_mock.enabled = False -_listener2_mock = mock.MagicMock() -_listener2_mock.id = LISTENER2_ID -_listener2_mock.enabled = True -_listeners = [_listener1_mock, _listener2_mock] - - -@mock.patch('octavia.db.api.get_session', return_value='TEST') -class TestControllerTasks(base.TestCase): - - def setUp(self): - - self.amphora_mock = mock.MagicMock() - self.amphora_mock.id = AMP_ID - - self.loadbalancer_mock = mock.MagicMock() - self.loadbalancer_mock.id = LB1_ID - self.loadbalancer_mock.enabled = True - - super(TestControllerTasks, self).setUp() - - @mock.patch('octavia.controller.worker.controller_worker.' - 'ControllerWorker.delete_load_balancer') - @mock.patch('octavia.db.repositories.AmphoraRepository.' - 'get_all_lbs_on_amphora', - return_value=_lbs) - def test_delete_load_balancers_on_amp(self, - mock_get_all_lbs_on_amp, - mock_delete_lb, - mock_get_session): - - delete_lbs_on_amp = controller_tasks.DeleteLoadBalancersOnAmp() - delete_lbs_on_amp.execute(self.amphora_mock) - - repo.AmphoraRepository.get_all_lbs_on_amphora.assert_called_once_with( - 'TEST', - amphora_id=AMP_ID) - - (controller_worker. - ControllerWorker.delete_load_balancer.assert_has_calls)([ - mock.call(LB1_ID), - mock.call(LB2_ID)], any_order=False) diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index fdb7e68eab..c8b5ab00d7 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -1228,13 +1228,13 @@ class TestControllerWorker(base.TestCase): 'amphora_flows.AmphoraFlows.get_failover_flow', return_value=_flow_mock) @mock.patch( - 'octavia.db.repositories.AmphoraRepository.get_all_lbs_on_amphora', - return_value=[_load_balancer_mock]) + 'octavia.db.repositories.AmphoraRepository.get_lb_for_amphora', + return_value=_load_balancer_mock) @mock.patch('octavia.db.repositories.LoadBalancerRepository.update') def test_failover_amphora_anti_affinity(self, mock_update, mock_get_update_listener_flow, - mock_get_all_lbs_for_amp_mock, + mock_get_lb_for_amphora, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load,