Improve the error logging for zombie amphora

In the case that nova failed to delete an amphroa they will continue to send
health heartbeat messages the the health manager. This patch improves the
logging of these amphora.

It also optimizes the statistics update flow when event streaming is
disabled by removing two extra database calls.

This patch also removes the un-used BaseControllerTask class.

This patch also finally solidifies that there will be one LB per amphora.

Change-Id: Idf83b19216c680a4854c1239ed9c5bc5ce7364a7
changes/69/561369/9
Michael Johnson 5 years ago committed by Adam Harwell
parent 96cce3ed74
commit 1417f6f0f8
  1. 14
      octavia/amphorae/drivers/health/heartbeat_udp.py
  2. 2
      octavia/common/constants.py
  3. 4
      octavia/controller/healthmanager/health_drivers/update_base.py
  4. 196
      octavia/controller/healthmanager/health_drivers/update_db.py
  5. 4
      octavia/controller/healthmanager/health_drivers/update_logging.py
  6. 5
      octavia/controller/worker/controller_worker.py
  7. 50
      octavia/controller/worker/tasks/controller_tasks.py
  8. 30
      octavia/db/repositories.py
  9. 9
      octavia/tests/functional/db/test_repositories.py
  10. 18
      octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py
  11. 4
      octavia/tests/unit/controller/healthmanager/health_drivers/test_update_base.py
  12. 138
      octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py
  13. 4
      octavia/tests/unit/controller/healthmanager/health_drivers/test_update_logging.py
  14. 80
      octavia/tests/unit/controller/worker/tasks/test_controller_tasks.py
  15. 6
      octavia/tests/unit/controller/worker/test_controller_worker.py

@ -29,22 +29,22 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def update_health(obj):
def update_health(obj, srcaddr):
handler = stevedore_driver.DriverManager(
namespace='octavia.amphora.health_update_drivers',
name=CONF.health_manager.health_update_driver,
invoke_on_load=True
).driver
handler.update_health(obj)
handler.update_health(obj, srcaddr)
def update_stats(obj):
def update_stats(obj, srcaddr):
handler = stevedore_driver.DriverManager(
namespace='octavia.amphora.stats_update_drivers',
name=CONF.health_manager.stats_update_driver,
invoke_on_load=True
).driver
handler.update_stats(obj)
handler.update_stats(obj, srcaddr)
class UDPStatusGetter(object):
@ -193,7 +193,7 @@ class UDPStatusGetter(object):
'Exception: %s', srcaddr, e)
raise exceptions.InvalidHMACException()
obj['recv_time'] = time.time()
return obj, srcaddr
return obj, srcaddr[0]
def check(self):
try:
@ -209,5 +209,5 @@ class UDPStatusGetter(object):
'heartbeat packet. Ignoring this packet. '
'Exception: %s', e)
else:
self.executor.submit(update_health, obj)
self.executor.submit(update_stats, obj)
self.executor.submit(update_health, obj, srcaddr)
self.executor.submit(update_stats, obj, srcaddr)

@ -400,6 +400,8 @@ MAX_QUOTA = 2000000000
API_VERSION = '0.5'
NOOP_EVENT_STREAMER = 'noop_event_streamer'
HAPROXY_BASE_PEER_PORT = 1025
KEEPALIVED_JINJA2_UPSTART = 'keepalived.upstart.j2'
KEEPALIVED_JINJA2_SYSTEMD = 'keepalived.systemd.j2'

@ -17,11 +17,11 @@ import abc
class HealthUpdateBase(object):
@abc.abstractmethod
def update_health(self, health):
def update_health(self, health, srcaddr):
raise NotImplementedError()
class StatsUpdateBase(object):
@abc.abstractmethod
def update_stats(self, health_message):
def update_stats(self, health_message, srcaddr):
raise NotImplementedError()

@ -68,22 +68,24 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
message.update({constants.OPERATING_STATUS: new_op_status})
if self.sync_prv_status:
LOG.debug("%s %s provisioning_status %s. "
"Updating db and sending event.",
"Sending event.",
entity_type, entity_id, current_prov_status)
message.update(
{constants.PROVISIONING_STATUS: current_prov_status})
if message:
self.emit(entity_type, entity_id, message)
def update_health(self, health):
def update_health(self, health, srcaddr):
# The executor will eat any exceptions from the update_health code
# so we need to wrap it and log the unhandled exception
try:
self._update_health(health)
self._update_health(health, srcaddr)
except Exception:
LOG.exception('update_health encountered an unknown error')
LOG.exception('update_health encountered an unknown error '
'processing health message for amphora {0} with IP '
'{1}'.format(health['id'], srcaddr))
def _update_health(self, health):
def _update_health(self, health, srcaddr):
"""This function is to update db info based on amphora status
:param health: map object that contains amphora, listener, member info
@ -110,19 +112,30 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
session = db_api.get_session()
# We need to see if all of the listeners are reporting in
expected_listener_count = 0
db_lbs_on_amp = self.amphora_repo.get_all_lbs_on_amphora(session,
health['id'])
db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id'])
ignore_listener_count = False
listeners = health['listeners']
if not db_lb:
# If this is not a spare amp, log and skip it.
amp = self.amphora_repo.get(session, id=health['id'])
if not amp or amp.load_balancer_id:
# This is debug and not warning because this can happen under
# normal deleting operations.
LOG.debug('Received a health heartbeat from amphora {0} with '
'IP {1} that should not exist. This amphora may be '
'in the process of being deleted, in which case you '
'will only see this message a few times. However if '
'it is repeating this amphora should be manually '
'deleted.'.format(health['id'], srcaddr))
return
# We need to loop over the lbs here to make sure we update the
# amphora_health record as soon as possible to prevent unnecessary
# failovers. Unfortunately that means looping over this list twice.
for db_lb in db_lbs_on_amp:
expected_listener_count += len(db_lb.listeners)
if 'PENDING' in db_lb.provisioning_status:
ignore_listener_count = True
expected_listener_count = len(db_lb.listeners)
if 'PENDING' in db_lb.provisioning_status:
ignore_listener_count = True
# Do not update amphora health if the reporting listener count
# does not match the expected listener count
@ -157,78 +170,76 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
{'id': health['id'], 'found': len(listeners),
'expected': expected_listener_count})
for db_lb in db_lbs_on_amp:
processed_pools = []
processed_pools = []
# We got a heartbeat so lb is healthy until proven otherwise
if db_lb.enabled is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE
# We got a heartbeat so lb is healthy until proven otherwise
if db_lb.enabled is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE
for db_listener in db_lb.listeners:
listener_status = None
listener_id = db_listener.id
listener = None
for db_listener in db_lb.listeners:
listener_status = None
listener_id = db_listener.id
listener = None
if listener_id not in listeners:
listener_status = constants.OFFLINE
if listener_id not in listeners:
listener_status = constants.OFFLINE
else:
listener = listeners[listener_id]
# OPEN = HAProxy listener status nbconn < maxconn
if listener.get('status') == constants.OPEN:
listener_status = constants.ONLINE
# FULL = HAProxy listener status not nbconn < maxconn
elif listener.get('status') == constants.FULL:
listener_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
else:
listener = listeners[listener_id]
# OPEN = HAProxy listener status nbconn < maxconn
if listener.get('status') == constants.OPEN:
listener_status = constants.ONLINE
# FULL = HAProxy listener status not nbconn < maxconn
elif listener.get('status') == constants.FULL:
listener_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
else:
LOG.warning(('Listener %(list)s reported status of '
'%(status)s'),
{'list': listener_id,
'status': listener.get('status')})
try:
if listener_status is not None:
self._update_status_and_emit_event(
session, self.listener_repo, constants.LISTENER,
listener_id, listener_status,
db_listener.operating_status,
db_listener.provisioning_status
)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Listener %s is not in DB", listener_id)
LOG.warning(('Listener %(list)s reported status of '
'%(status)s'),
{'list': listener_id,
'status': listener.get('status')})
if not listener:
continue
try:
if listener_status is not None:
self._update_status_and_emit_event(
session, self.listener_repo, constants.LISTENER,
listener_id, listener_status,
db_listener.operating_status,
db_listener.provisioning_status
)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Listener %s is not in DB", listener_id)
pools = listener['pools']
if not listener:
continue
# Process pools bound to listeners
for db_pool in db_listener.pools:
lb_status = self._process_pool_status(
session, db_pool, pools, lb_status, processed_pools)
pools = listener['pools']
# Process pools bound to the load balancer
for db_pool in db_lb.pools:
# Don't re-process pools shared with listeners
if db_pool.id in processed_pools:
continue
# Process pools bound to listeners
for db_pool in db_listener.pools:
lb_status = self._process_pool_status(
session, db_pool, [], lb_status, processed_pools)
session, db_pool, pools, lb_status, processed_pools)
# Update the load balancer status last
try:
self._update_status_and_emit_event(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb.id, lb_status,
db_lb.operating_status, db_lb.provisioning_status
)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Load balancer %s is not in DB", db_lb.id)
# Process pools bound to the load balancer
for db_pool in db_lb.pools:
# Don't re-process pools shared with listeners
if db_pool.id in processed_pools:
continue
lb_status = self._process_pool_status(
session, db_pool, [], lb_status, processed_pools)
# Update the load balancer status last
try:
self._update_status_and_emit_event(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb.id, lb_status,
db_lb.operating_status, db_lb.provisioning_status
)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Load balancer %s is not in DB", db_lb.id)
LOG.debug('Health Update finished in: {0} seconds'.format(
timeit.default_timer() - start_time))
@ -335,15 +346,17 @@ class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin):
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
self.event_streamer.emit(cnt)
def update_stats(self, health_message):
def update_stats(self, health_message, srcaddr):
# The executor will eat any exceptions from the update_stats code
# so we need to wrap it and log the unhandled exception
try:
self._update_stats(health_message)
self._update_stats(health_message, srcaddr)
except Exception:
LOG.exception('update_stats encountered an unknown error')
LOG.exception('update_stats encountered an unknown error '
'processing stats for amphora {0} with IP '
'{1}'.format(health_message['id'], srcaddr))
def _update_stats(self, health_message):
def _update_stats(self, health_message, srcaddr):
"""This function is to update the db with listener stats
:param health_message: The health message containing the listener stats
@ -392,12 +405,21 @@ class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin):
self.listener_stats_repo.replace(
session, listener_id, amphora_id, **stats)
listener_stats = self.get_listener_stats(session, listener_id)
self.emit(
'listener_stats', listener_id, listener_stats.get_stats())
listener_db = self.repo_listener.get(session, id=listener_id)
lb_stats = self.get_loadbalancer_stats(
session, listener_db.load_balancer_id)
self.emit('loadbalancer_stats',
listener_db.load_balancer_id, lb_stats.get_stats())
if (CONF.health_manager.event_streamer_driver !=
constants.NOOP_EVENT_STREAMER):
listener_stats = self.get_listener_stats(session, listener_id)
self.emit(
'listener_stats', listener_id, listener_stats.get_stats())
listener_db = self.repo_listener.get(session, id=listener_id)
if not listener_db:
LOG.debug('Received health stats for a non-existent '
'listener {0} for amphora {1} with IP '
'{2}.'.format(listener_id, amphora_id,
srcaddr))
return
lb_stats = self.get_loadbalancer_stats(
session, listener_db.load_balancer_id)
self.emit('loadbalancer_stats',
listener_db.load_balancer_id, lb_stats.get_stats())

@ -20,10 +20,10 @@ LOG = logging.getLogger(__name__)
class HealthUpdateLogger(update_base.HealthUpdateBase):
def update_health(self, health):
def update_health(self, health, srcaddr):
LOG.info("Health update triggered for: %s", health.get('id'))
class StatsUpdateLogger(update_base.StatsUpdateBase):
def update_stats(self, health_message):
def update_stats(self, health_message, srcaddr):
LOG.info("Stats update triggered for: %s", health_message.get('id'))

@ -748,11 +748,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# if we run with anti-affinity we need to set the server group
# as well
if CONF.nova.enable_anti_affinity:
lb = self._amphora_repo.get_all_lbs_on_amphora(
lb = self._amphora_repo.get_lb_for_amphora(
db_apis.get_session(), amp.id)
if lb:
stored_params[constants.SERVER_GROUP_ID] = (
lb[0].server_group_id)
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(

@ -1,50 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow import task
from octavia.db import api as db_apis
from octavia.db import repositories as repo
class BaseControllerTask(task.Task):
"""Base task to load drivers common to the tasks."""
def __init__(self, **kwargs):
from octavia.controller.worker import controller_worker
self.cntrlr_worker = controller_worker.ControllerWorker()
self.listener_repo = repo.ListenerRepository()
self.amp_repo = repo.AmphoraRepository()
self.pool_repo = repo.PoolRepository()
super(BaseControllerTask, self).__init__(**kwargs)
# todo(xgerman): Make sure this is used outside tests
class DeleteLoadBalancersOnAmp(BaseControllerTask):
"""Delete the load balancers on an amphora."""
def execute(self, amphora):
"""Deletes the load balancers on an amphora.
Iterate across the load balancers on an amphora and
call back into the controller worker to delete the
load balancers.
:param amphora: The amphora to delete the load balancers from
"""
lbs = self.amp_repo.get_all_lbs_on_amphora(db_apis.get_session(),
amphora_id=amphora.id)
for lb in lbs:
self.cntrlr_worker.delete_load_balancer(lb.id)

@ -938,21 +938,33 @@ class AmphoraRepository(BaseRepository):
return amp.to_data_model()
def get_all_lbs_on_amphora(self, session, amphora_id):
@staticmethod
def get_lb_for_amphora(session, amphora_id):
"""Get all of the load balancers on an amphora.
:param session: A Sql Alchemy database session.
:param amphora_id: The amphora id to list the load balancers from
:returns: [octavia.common.data_model]
"""
with session.begin(subtransactions=True):
lb_subquery = (session.query(self.model_class.load_balancer_id).
filter_by(id=amphora_id).subquery())
lb_list = (session.query(models.LoadBalancer).
filter(models.LoadBalancer.id.in_(lb_subquery)).
options(joinedload('*')).all())
data_model_list = [model.to_data_model() for model in lb_list]
return data_model_list
with session.begin():
db_lb = (
# Get LB records
session.query(models.LoadBalancer)
# Joined to amphora records
.filter(models.LoadBalancer.id ==
models.Amphora.load_balancer_id)
# For just this amphora
.filter(models.Amphora.id == amphora_id)
# Where the amphora is not DELETED
.filter(models.Amphora.status != consts.DELETED)
# And the LB is also not DELETED
.filter(models.LoadBalancer.provisioning_status !=
consts.DELETED)
# And what does this do? Some SQLAlchemy magic?
.options(joinedload('*'))
).first()
if db_lb:
return db_lb.to_data_model()
def get_all_deleted_expiring_amphora(self, session, exp_age=None):

@ -3032,13 +3032,12 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
self.assertIsNotNone(new_amphora)
self.assertIsInstance(new_amphora, models.Amphora)
def test_get_all_lbs_on_amphora(self):
def test_get_lb_for_amphora(self):
amphora = self.create_amphora(self.FAKE_UUID_1)
self.amphora_repo.associate(self.session, self.lb.id, amphora.id)
lb_list = self.amphora_repo.get_all_lbs_on_amphora(self.session,
amphora.id)
self.assertIsNotNone(lb_list)
self.assertIn(self.lb, lb_list)
lb = self.amphora_repo.get_lb_for_amphora(self.session, amphora.id)
self.assertIsNotNone(lb)
self.assertEqual(self.lb, lb)
def get_all_deleted_expiring_amphora(self):
exp_age = datetime.timedelta(seconds=self.FAKE_EXP_AGE)

@ -62,24 +62,26 @@ class TestHeartbeatUDP(base.TestCase):
@mock.patch('stevedore.driver.DriverManager')
def test_update_health_func(self, driver_manager):
obj = {'id': 1}
heartbeat_udp.update_health(obj)
heartbeat_udp.update_health(obj, '192.0.2.1')
driver_manager.assert_called_once_with(
invoke_on_load=True,
name='health_logger',
namespace='octavia.amphora.health_update_drivers'
)
driver_manager().driver.update_health.assert_called_once_with(obj)
driver_manager().driver.update_health.assert_called_once_with(
obj, '192.0.2.1')
@mock.patch('stevedore.driver.DriverManager')
def test_update_stats_func(self, driver_manager):
obj = {'id': 1}
heartbeat_udp.update_stats(obj)
heartbeat_udp.update_stats(obj, '192.0.2.1')
driver_manager.assert_called_once_with(
invoke_on_load=True,
name='stats_logger',
namespace='octavia.amphora.stats_update_drivers'
)
driver_manager().driver.update_stats.assert_called_once_with(obj)
driver_manager().driver.update_stats.assert_called_once_with(
obj, '192.0.2.1')
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
@ -117,9 +119,9 @@ class TestHeartbeatUDP(base.TestCase):
'1aa050041b506245806e5c1971e79951818394e'
'a6e71ad989ff950945f9573f4ab6f83e25db8ed7')
bin_msg = binascii.unhexlify(sample_msg)
recvfrom.return_value = bin_msg, 2
recvfrom.return_value = bin_msg, ('192.0.2.1', 2)
(obj, srcaddr) = getter.dorecv()
self.assertEqual(2, srcaddr)
self.assertEqual('192.0.2.1', srcaddr)
self.assertIsNotNone(obj.pop('recv_time'))
self.assertEqual({"testkey": "TEST"}, obj)
@ -163,8 +165,8 @@ class TestHeartbeatUDP(base.TestCase):
getter.check()
getter.executor.shutdown()
mock_executor.submit.assert_has_calls(
[mock.call(heartbeat_udp.update_health, {'id': 1}),
mock.call(heartbeat_udp.update_stats, {'id': 1})])
[mock.call(heartbeat_udp.update_health, {'id': 1}, 2),
mock.call(heartbeat_udp.update_stats, {'id': 1}, 2)])
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')

@ -25,7 +25,7 @@ class TestHealthUpdateBase(base.TestCase):
def test_update_health(self):
self.assertRaises(NotImplementedError,
self.logger.update_health, {'id': 1})
self.logger.update_health, {'id': 1}, '192.0.2.1')
class TestStatsUpdateBase(base.TestCase):
@ -35,4 +35,4 @@ class TestStatsUpdateBase(base.TestCase):
def test_update_stats(self):
self.assertRaises(NotImplementedError,
self.logger.update_stats, {'id': 1})
self.logger.update_stats, {'id': 1}, '192.0.2.1')

@ -66,7 +66,7 @@ class TestUpdateHealthDb(base.TestCase):
self.hm.amphora_repo = self.amphora_repo
fake_lb = mock.MagicMock()
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [fake_lb]
self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb
self.hm.amphora_health_repo = self.amphora_health_repo
self.hm.listener_repo = self.listener_repo
self.hm.listener_repo.count.return_value = 1
@ -132,9 +132,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.event_client.cast.assert_any_call(
{}, 'update_info', container={
'info_type': 'listener', 'info_id': 'listener-id-1',
@ -158,10 +158,10 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.assertTrue(self.amphora_health_repo.replace.called)
@ -176,10 +176,10 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=True, pool=False,
lb_prov_status=constants.PENDING_UPDATE))
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.assertTrue(self.amphora_health_repo.replace.called)
@ -193,10 +193,10 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=True, pool=False))
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.assertFalse(self.amphora_health_repo.replace.called)
@ -210,10 +210,10 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
# Receive time is stale, so we shouldn't see this called
self.assertFalse(self.loadbalancer_repo.update.called)
@ -221,15 +221,25 @@ class TestUpdateHealthDb(base.TestCase):
health = {
"id": self.FAKE_UUID_1,
"listeners": {},
"listeners": {
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.UP}
}
}
}
},
"recv_time": time.time()
}
self.session_mock.commit.side_effect = TestException('boom')
self.amphora_repo.get_all_lbs_on_amphora.return_value = []
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
self.session_mock.rollback.assert_called_once()
@ -250,8 +260,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -281,7 +291,7 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb.listeners = [mock_listener1, mock_listener2]
self.amphora_health_repo.replace.reset_mock()
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(not self.amphora_health_repo.replace.called)
def test_update_lb_pool_health_offline(self):
@ -297,8 +307,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -344,8 +354,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb.listeners.append(mock_listener2)
mock_lb.pools.append(mock_pool2)
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -381,8 +391,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -423,8 +433,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -454,8 +464,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -494,8 +504,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -534,8 +544,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -570,9 +580,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -613,9 +623,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(health_monitor=False))
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -653,9 +663,9 @@ class TestUpdateHealthDb(base.TestCase):
self._make_mock_lb_tree(health_monitor=False))
mock_members[0].admin_state_up = False
mock_members[0].operating_status = constants.NO_MONITOR
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -694,8 +704,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -735,9 +745,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(members=2))
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
# test listener, member
for listener_id, listener in six.iteritems(
@ -780,8 +790,8 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.update_health(health)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -811,7 +821,7 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb.listeners.append(mock_listener2)
self.amphora_health_repo.replace.reset_mock()
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(not self.amphora_health_repo.replace.called)
def test_update_health_error(self):
@ -831,9 +841,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -922,9 +932,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_listener.pools = [mock_pool]
mock_lb.listeners.append(mock_listener)
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
# test listener
self.listener_repo.update.assert_any_call(
@ -973,9 +983,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
@ -1024,9 +1034,9 @@ class TestUpdateHealthDb(base.TestCase):
mock_pool1.operating_status = constants.ONLINE
mock_listener1.operating_status = constants.ONLINE
mock_lb.operating_status = constants.ONLINE
self.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.hm.update_health(health, '192.0.2.1')
self.event_client.cast.assert_not_called()
self.loadbalancer_repo.update.assert_not_called()
self.listener_repo.update.assert_not_called()
@ -1042,10 +1052,10 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
mock_lb.enabled = False
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.loadbalancer_repo.update.assert_called_with(
self.mock_session(), mock_lb.id,
@ -1060,10 +1070,10 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
mock_lb.enabled = True
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [mock_lb]
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.hm.update_health(health)
self.assertTrue(self.amphora_repo.get_all_lbs_on_amphora.called)
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.loadbalancer_repo.update.assert_called_with(
self.mock_session(), mock_lb.id,
@ -1146,7 +1156,7 @@ class TestUpdateStatsDb(base.TestCase):
session.return_value = 'blah'
self.sm.update_stats(health)
self.sm.update_stats(health, '192.0.2.1')
self.listener_stats_repo.replace.assert_called_once_with(
'blah', self.listener_id, self.amphora_id,

@ -28,7 +28,7 @@ class TestHealthUpdateLogger(base.TestCase):
@mock.patch('octavia.controller.healthmanager.health_drivers'
'.update_logging.LOG')
def test_update_health(self, mock_log):
self.logger.update_health({'id': 1})
self.logger.update_health({'id': 1}, '192.0.2.1')
self.assertEqual(1, mock_log.info.call_count)
@ -40,5 +40,5 @@ class TestStatsUpdateLogger(base.TestCase):
@mock.patch('octavia.controller.healthmanager.health_drivers'
'.update_logging.LOG')
def test_update_stats(self, mock_log):
self.logger.update_stats({'id': 1})
self.logger.update_stats({'id': 1}, '192.0.2.1')
self.assertEqual(1, mock_log.info.call_count)

@ -1,80 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from oslo_utils import uuidutils
from octavia.controller.worker import controller_worker
from octavia.controller.worker.tasks import controller_tasks
from octavia.db import repositories as repo
import octavia.tests.unit.base as base
AMP_ID = uuidutils.generate_uuid()
LB1_ID = uuidutils.generate_uuid()
LB2_ID = uuidutils.generate_uuid()
LISTENER1_ID = uuidutils.generate_uuid()
LISTENER2_ID = uuidutils.generate_uuid()
_lb1_mock = mock.MagicMock()
_lb1_mock.id = LB1_ID
_lb2_mock = mock.MagicMock()
_lb2_mock.id = LB2_ID
_lbs = [_lb1_mock, _lb2_mock]
_listener1_mock = mock.MagicMock()
_listener1_mock.id = LISTENER1_ID
_listener1_mock.enabled = False
_listener2_mock = mock.MagicMock()
_listener2_mock.id = LISTENER2_ID
_listener2_mock.enabled = True
_listeners = [_listener1_mock, _listener2_mock]
@mock.patch('octavia.db.api.get_session', return_value='TEST')
class TestControllerTasks(base.TestCase):
def setUp(self):
self.amphora_mock = mock.MagicMock()
self.amphora_mock.id = AMP_ID
self.loadbalancer_mock = mock.MagicMock()
self.loadbalancer_mock.id = LB1_ID
self.loadbalancer_mock.enabled = True
super(TestControllerTasks, self).setUp()
@mock.patch('octavia.controller.worker.controller_worker.'
'ControllerWorker.delete_load_balancer')
@mock.patch('octavia.db.repositories.AmphoraRepository.'
'get_all_lbs_on_amphora',
return_value=_lbs)
def test_delete_load_balancers_on_amp(self,
mock_get_all_lbs_on_amp,
mock_delete_lb,
mock_get_session):
delete_lbs_on_amp = controller_tasks.DeleteLoadBalancersOnAmp()
delete_lbs_on_amp.execute(self.amphora_mock)
repo.AmphoraRepository.get_all_lbs_on_amphora.assert_called_once_with(
'TEST',
amphora_id=AMP_ID)
(controller_worker.
ControllerWorker.delete_load_balancer.assert_has_calls)([
mock.call(LB1_ID),
mock.call(LB2_ID)], any_order=False)

@ -1228,13 +1228,13 @@ class TestControllerWorker(base.TestCase):
'amphora_flows.AmphoraFlows.get_failover_flow',
return_value=_flow_mock)
@mock.patch(
'octavia.db.repositories.AmphoraRepository.get_all_lbs_on_amphora',
return_value=[_load_balancer_mock])
'octavia.db.repositories.AmphoraRepository.get_lb_for_amphora',
return_value=_load_balancer_mock)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_failover_amphora_anti_affinity(self,
mock_update,
mock_get_update_listener_flow,
mock_get_all_lbs_for_amp_mock,
mock_get_lb_for_amphora,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,

Loading…
Cancel
Save