Fix amp failover where failover already failed

If a failover ran on an amphora and was unsuccessful and reverted, it
would mark the amp status "DELETED" and un-busy the health record.
It would then be picked up on the next failover check, start failing
over, and break early since it appeared to be "unallocated".

Also, housekeeping can now clean up expired amphora records based on the
amphora's updated_at time instead of the health record's time, which
means the records won't be immediately cleaned up anymore after they go
through failover flows.

Change-Id: I848b7fc69b977fcb39f8a07e2ea5fc7bd37b5c7a
This commit is contained in:
Adam Harwell 2018-03-01 15:46:56 +00:00
parent 0359422739
commit 96cce3ed74
11 changed files with 177 additions and 71 deletions

View File

@ -72,18 +72,24 @@ class DatabaseCleanup(object):
seconds=CONF.house_keeping.amphora_expiry_age)
session = db_api.get_session()
amphora, _ = self.amp_repo.get_all(session, status=constants.DELETED)
expiring_amphora = self.amp_repo.get_all_deleted_expiring_amphora(
session, exp_age=exp_age)
for amp in amphora:
if self.amp_health_repo.check_amphora_expired(session, amp.id,
exp_age):
LOG.info('Attempting to delete Amphora id : %s', amp.id)
for amp in expiring_amphora:
# If we're here, we already think the amp is expiring according to
# the amphora table. Now check it is expired in the health table.
# In this way, we ensure that amps aren't deleted unless they are
# both expired AND no longer receiving zombie heartbeats.
if self.amp_health_repo.check_amphora_health_expired(
session, amp.id, exp_age):
LOG.debug('Attempting to purge db record for Amphora ID: %s',
amp.id)
self.amp_repo.delete(session, id=amp.id)
try:
self.amp_health_repo.delete(session, amphora_id=amp.id)
except sqlalchemy_exceptions.NoResultFound:
pass # Best effort delete, this record might not exist
LOG.info('Deleted Amphora id : %s', amp.id)
LOG.info('Purged db record for Amphora ID: %s', amp.id)
def cleanup_load_balancers(self):
"""Checks the DB for old load balancers and triggers their removal."""

View File

@ -732,11 +732,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
if amp.status == constants.DELETED:
LOG.warning('Amphora %s is marked DELETED in the database but '
'was submitted for failover. Marking it busy in the '
'was submitted for failover. Deleting it from the '
'amphora health table to exclude it from health '
'checks and skipping the failover.', amp.id)
self._amphora_health_repo.update(db_apis.get_session(), amp.id,
busy=True)
self._amphora_health_repo.delete(db_apis.get_session(),
amphora_id=amp.id)
return
if (CONF.house_keeping.spare_amphora_pool_size == 0) and (
@ -755,8 +755,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
lb[0].server_group_id)
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(role=amp.role,
status=amp.status),
self._amphora_flows.get_failover_flow(
role=amp.role, load_balancer_id=amp.load_balancer_id),
store=stored_params)
with tf_logging.DynamicLoggingListener(

View File

@ -290,7 +290,7 @@ class AmphoraFlows(object):
return delete_amphora_flow
def get_failover_flow(self, role=constants.ROLE_STANDALONE,
status=constants.AMPHORA_READY):
load_balancer_id=None):
"""Creates a flow to failover a stale amphora
:returns: The flow for amphora failover
@ -329,16 +329,16 @@ class AmphoraFlows(object):
failover_amphora_flow.add(network_tasks.WaitForPortDetach(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
# If this is an unallocated amp (spares pool), we're done
if status != constants.AMPHORA_ALLOCATED:
if not load_balancer_id:
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
return failover_amphora_flow
# Save failed amphora details for later
@ -413,6 +413,10 @@ class AmphoraFlows(object):
failover_amphora_flow.add(amphora_driver_tasks.ListenersStart(
requires=(constants.LOADBALANCER, constants.LISTENERS)))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
return failover_amphora_flow

View File

@ -33,7 +33,6 @@ class AmphoraIDToErrorOnRevertTask(BaseLifecycleTask):
def revert(self, amphora_id, *args, **kwargs):
self.task_utils.mark_amphora_status_error(amphora_id)
self.task_utils.unmark_amphora_health_busy(amphora_id)
class AmphoraToErrorOnRevertTask(AmphoraIDToErrorOnRevertTask):

View File

@ -954,6 +954,33 @@ class AmphoraRepository(BaseRepository):
data_model_list = [model.to_data_model() for model in lb_list]
return data_model_list
def get_all_deleted_expiring_amphora(self, session, exp_age=None):
"""Get all previously deleted amphora that are now expiring.
:param session: A Sql Alchemy database session.
:param exp_age: A standard datetime delta which is used to see for how
long can an amphora live without updates before it is
considered expired (default:
CONF.house_keeping.amphora_expiry_age)
:returns: [octavia.common.data_model]
"""
if not exp_age:
exp_age = datetime.timedelta(
seconds=CONF.house_keeping.amphora_expiry_age)
expiry_time = datetime.datetime.utcnow() - exp_age
query = session.query(self.model_class).filter_by(
status=consts.DELETED).filter(
self.model_class.updated_at < expiry_time)
# Only make one trip to the database
query = query.options(joinedload('*'))
model_list = query.all()
data_model_list = [model.to_data_model() for model in model_list]
return data_model_list
def get_spare_amphora_count(self, session):
"""Get the count of the spare amphora.
@ -1096,8 +1123,8 @@ class AmphoraHealthRepository(BaseRepository):
model_kwargs['amphora_id'] = amphora_id
self.create(session, **model_kwargs)
def check_amphora_expired(self, session, amphora_id, exp_age=None):
"""check if a specific amphora is expired
def check_amphora_health_expired(self, session, amphora_id, exp_age=None):
"""check if a specific amphora is expired in the amphora_health table
:param session: A Sql Alchemy database session.
:param amphora_id: id of an amphora object
@ -1111,16 +1138,28 @@ class AmphoraHealthRepository(BaseRepository):
exp_age = datetime.timedelta(
seconds=CONF.house_keeping.amphora_expiry_age)
timestamp = datetime.datetime.utcnow() - exp_age
amphora_health = self.get(session, amphora_id=amphora_id)
if amphora_health is not None:
return amphora_health.last_update < timestamp
else:
# Amphora was just destroyed.
return True
expiry_time = datetime.datetime.utcnow() - exp_age
amphora_model = (
session.query(models.AmphoraHealth)
.filter_by(amphora_id=amphora_id)
.filter(models.AmphoraHealth.last_update > expiry_time)
).first()
# This will return a value if:
# * there is an entry in the table for this amphora_id
# AND
# * the entry was last updated more recently than our expiry_time
# Receiving any value means that the amp is unexpired.
# In contrast, we receive no value if:
# * there is no entry for this amphora_id
# OR
# * the entry was last updated before our expiry_time
# In this case, the amphora is expired.
return amphora_model is None
def get_stale_amphora(self, session):
"""Retrieves a staled amphora from the health manager database.
"""Retrieves a stale amphora from the health manager database.
:param session: A Sql Alchemy database session.
:returns: [octavia.common.data_model]

View File

@ -2946,17 +2946,20 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
provisioning_status=constants.ACTIVE,
operating_status=constants.ONLINE, enabled=True)
def create_amphora(self, amphora_id):
expiration = datetime.datetime.utcnow()
amphora = self.amphora_repo.create(self.session, id=amphora_id,
compute_id=self.FAKE_UUID_3,
status=constants.ACTIVE,
lb_network_ip=self.FAKE_IP,
vrrp_ip=self.FAKE_IP,
ha_ip=self.FAKE_IP,
role=constants.ROLE_MASTER,
cert_expiration=expiration,
cert_busy=False)
def create_amphora(self, amphora_id, **overrides):
settings = {
'id': amphora_id,
'compute_id': self.FAKE_UUID_3,
'status': constants.ACTIVE,
'lb_network_ip': self.FAKE_IP,
'vrrp_ip': self.FAKE_IP,
'ha_ip': self.FAKE_IP,
'role': constants.ROLE_MASTER,
'cert_expiration': datetime.datetime.utcnow(),
'cert_busy': False
}
settings.update(overrides)
amphora = self.amphora_repo.create(self.session, **settings)
return amphora
def test_get(self):
@ -3037,6 +3040,20 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
self.assertIsNotNone(lb_list)
self.assertIn(self.lb, lb_list)
def get_all_deleted_expiring_amphora(self):
exp_age = datetime.timedelta(seconds=self.FAKE_EXP_AGE)
updated_at = datetime.datetime.utcnow() - exp_age
amphora1 = self.create_amphora(
self.FAKE_UUID_1, updated_at=updated_at, status=constants.DELETED)
amphora2 = self.create_amphora(
self.FAKE_UUID_2, status=constants.DELETED)
expiring_list = self.amphora_repo.get_all_deleted_expiring_amphora(
self.session, exp_age=exp_age)
expiring_ids = [amp.id for amp in expiring_list]
self.assertIn(amphora1.id, expiring_ids)
self.assertNotIn(amphora2.id, expiring_ids)
def test_get_spare_amphora_count(self):
count = self.amphora_repo.get_spare_amphora_count(self.session)
self.assertEqual(0, count)
@ -3131,7 +3148,7 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest):
def test_check_amphora_expired_default_exp_age(self):
"""When exp_age defaults to CONF.house_keeping.amphora_expiry_age."""
self.create_amphora_health(self.amphora.id)
checkres = self.amphora_health_repo.check_amphora_expired(
checkres = self.amphora_health_repo.check_amphora_health_expired(
self.session, self.amphora.id)
# Default amphora_expiry_age value is 1 week so amphora shouldn't be
# considered expired.
@ -3142,13 +3159,13 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest):
exp_age = datetime.timedelta(
seconds=self.FAKE_EXP_AGE)
self.create_amphora_health(self.amphora.id)
checkres = self.amphora_health_repo.check_amphora_expired(
checkres = self.amphora_health_repo.check_amphora_health_expired(
self.session, self.amphora.id, exp_age)
self.assertTrue(checkres)
def test_check_amphora_expired_with_no_age(self):
"""When the amphora_health entry is missing in the DB."""
checkres = self.amphora_health_repo.check_amphora_expired(
checkres = self.amphora_health_repo.check_amphora_health_expired(
self.session, self.amphora.id)
self.assertTrue(checkres)

View File

@ -12,8 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.common import constants
@ -49,13 +52,14 @@ class TestSpareCheck(base.TestCase):
self.spare_amp.amp_repo = self.amp_repo
self.spare_amp.cw = self.cw
self.CONF = cfg.CONF
self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF))
@mock.patch('octavia.db.api.get_session')
def test_spare_check_diff_count(self, session):
"""When spare amphora count does not meet the requirement."""
session.return_value = session
self.CONF.house_keeping.spare_amphora_pool_size = self.FAKE_CNF_SPAR1
self.CONF.config(group="house_keeping",
spare_amphora_pool_size=self.FAKE_CNF_SPAR1)
self.amp_repo.get_spare_amphora_count.return_value = (
self.FAKE_CUR_SPAR1)
self.spare_amp.spare_check()
@ -68,7 +72,8 @@ class TestSpareCheck(base.TestCase):
def test_spare_check_no_diff_count(self, session):
"""When spare amphora count meets the requirement."""
session.return_value = session
self.CONF.house_keeping.spare_amphora_pool_size = self.FAKE_CNF_SPAR2
self.CONF.config(group="house_keeping",
spare_amphora_pool_size=self.FAKE_CNF_SPAR2)
self.amp_repo.get_spare_amphora_count.return_value = (
self.FAKE_CUR_SPAR2)
self.spare_amp.spare_check()
@ -83,7 +88,7 @@ class TestDatabaseCleanup(base.TestCase):
FAKE_IP = "10.0.0.1"
FAKE_UUID_1 = uuidutils.generate_uuid()
FAKE_UUID_2 = uuidutils.generate_uuid()
FAKE_EXP_AGE = 10
FAKE_EXP_AGE = 60
def setUp(self):
super(TestDatabaseCleanup, self).setUp()
@ -95,48 +100,84 @@ class TestDatabaseCleanup(base.TestCase):
self.dbclean.amp_repo = self.amp_repo
self.dbclean.amp_health_repo = self.amp_health_repo
self.CONF = cfg.CONF
self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF))
@mock.patch('octavia.db.api.get_session')
def test_delete_old_amphorae_True(self, session):
"""When the deleted amphorae is expired."""
session.return_value = session
self.CONF.house_keeping.amphora_expiry_age = self.FAKE_EXP_AGE
self.CONF.config(group="house_keeping",
amphora_expiry_age=self.FAKE_EXP_AGE)
expired_time = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.FAKE_EXP_AGE + 1)
amphora = self.amp.create(session, id=self.FAKE_UUID_1,
compute_id=self.FAKE_UUID_2,
status=constants.DELETED,
lb_network_ip=self.FAKE_IP,
vrrp_ip=self.FAKE_IP,
ha_ip=self.FAKE_IP)
self.amp_repo.get_all.return_value = ([amphora], None)
self.amp_health_repo.check_amphora_expired.return_value = True
ha_ip=self.FAKE_IP,
updated_at=expired_time)
self.amp_repo.get_all_deleted_expiring_amphora.return_value = [amphora]
self.amp_health_repo.check_amphora_health_expired.return_value = True
self.dbclean.delete_old_amphorae()
self.assertTrue(self.amp_repo.get_all.called)
self.assertTrue(self.amp_health_repo.check_amphora_expired.called)
self.assertTrue(self.amp_repo.get_all_deleted_expiring_amphora.called)
self.assertTrue(
self.amp_health_repo.check_amphora_health_expired.called)
self.assertTrue(self.amp_repo.delete.called)
@mock.patch('octavia.db.api.get_session')
def test_delete_old_amphorae_False(self, session):
"""When the deleted amphorae is not expired."""
session.return_value = session
self.CONF.house_keeping.amphora_expiry_age = self.FAKE_EXP_AGE
self.CONF.config(group="house_keeping",
amphora_expiry_age=self.FAKE_EXP_AGE)
self.amp.create(session, id=self.FAKE_UUID_1,
compute_id=self.FAKE_UUID_2,
status=constants.DELETED,
lb_network_ip=self.FAKE_IP,
vrrp_ip=self.FAKE_IP,
ha_ip=self.FAKE_IP,
updated_at=datetime.datetime.now())
self.amp_repo.get_all_deleted_expiring_amphora.return_value = []
self.dbclean.delete_old_amphorae()
self.assertTrue(self.amp_repo.get_all_deleted_expiring_amphora.called)
self.assertFalse(
self.amp_health_repo.check_amphora_health_expired.called)
self.assertFalse(self.amp_repo.delete.called)
@mock.patch('octavia.db.api.get_session')
def test_delete_old_amphorae_Zombie(self, session):
"""When the deleted amphorae is expired but is a zombie!
This is when the amphora is expired in the amphora table, but in the
amphora_health table there are newer records, meaning the amp checked
in with the healthmanager *after* it was deleted (and craves brains).
"""
session.return_value = session
self.CONF.config(group="house_keeping",
amphora_expiry_age=self.FAKE_EXP_AGE)
expired_time = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.FAKE_EXP_AGE + 1)
amphora = self.amp.create(session, id=self.FAKE_UUID_1,
compute_id=self.FAKE_UUID_2,
status=constants.DELETED,
lb_network_ip=self.FAKE_IP,
vrrp_ip=self.FAKE_IP,
ha_ip=self.FAKE_IP)
self.amp_repo.get_all.return_value = ([amphora], None)
self.amp_health_repo.check_amphora_expired.return_value = False
ha_ip=self.FAKE_IP,
updated_at=expired_time)
self.amp_repo.get_all_deleted_expiring_amphora.return_value = [amphora]
self.amp_health_repo.check_amphora_health_expired.return_value = False
self.dbclean.delete_old_amphorae()
self.assertTrue(self.amp_repo.get_all.called)
self.assertTrue(self.amp_health_repo.check_amphora_expired.called)
self.assertTrue(self.amp_repo.get_all_deleted_expiring_amphora.called)
self.assertTrue(
self.amp_health_repo.check_amphora_health_expired.called)
self.assertFalse(self.amp_repo.delete.called)
@mock.patch('octavia.db.api.get_session')
def test_delete_old_load_balancer(self, session):
"""Check delete of load balancers in DELETED provisioning status."""
self.CONF.house_keeping.load_balancer_expiry_age = self.FAKE_EXP_AGE
self.CONF.config(group="house_keeping",
load_balancer_expiry_age=self.FAKE_EXP_AGE)
session.return_value = session
load_balancer = self.lb.create(session, id=self.FAKE_UUID_1,
provisioning_status=constants.DELETED,

View File

@ -237,7 +237,7 @@ class TestAmphoraFlows(base.TestCase):
def test_get_failover_flow_allocated(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_flow(
status=constants.AMPHORA_ALLOCATED)
load_balancer_id='mylb')
self.assertIsInstance(amp_flow, flow.Flow)
@ -257,7 +257,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role=constants.ROLE_MASTER, status=constants.AMPHORA_ALLOCATED)
role=constants.ROLE_MASTER, load_balancer_id='mylb')
self.assertIsInstance(amp_flow, flow.Flow)
@ -277,7 +277,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role=constants.ROLE_BACKUP, status=constants.AMPHORA_ALLOCATED)
role=constants.ROLE_BACKUP, load_balancer_id='mylb')
self.assertIsInstance(amp_flow, flow.Flow)
@ -297,7 +297,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role='BOGUSROLE', status=constants.AMPHORA_ALLOCATED)
role='BOGUSROLE', load_balancer_id='mylb')
self.assertIsInstance(amp_flow, flow.Flow)
@ -319,7 +319,7 @@ class TestAmphoraFlows(base.TestCase):
def test_get_failover_flow_spare(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_flow(
status=constants.AMPHORA_READY)
load_balancer_id=None)
self.assertIsInstance(amp_flow, flow.Flow)

View File

@ -72,7 +72,7 @@ class TestLifecycleTasks(base.TestCase):
amp_id_to_error_on_revert.revert(self.AMPHORA_ID)
mock_amp_status_error.assert_called_once_with(self.AMPHORA_ID)
mock_amp_health_busy.assert_called_once_with(self.AMPHORA_ID)
self.assertFalse(mock_amp_health_busy.called)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'unmark_amphora_health_busy')
@ -92,7 +92,7 @@ class TestLifecycleTasks(base.TestCase):
amp_to_error_on_revert.revert(self.AMPHORA)
mock_amp_status_error.assert_called_once_with(self.AMPHORA_ID)
mock_amp_health_busy.assert_called_once_with(self.AMPHORA_ID)
self.assertFalse(mock_amp_health_busy.called)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_health_mon_prov_status_error')

View File

@ -1155,9 +1155,9 @@ class TestControllerWorker(base.TestCase):
mock_update.assert_called_with('TEST', LB_ID,
provisioning_status=constants.ACTIVE)
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.update')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete')
def test_failover_deleted_amphora(self,
mock_update,
mock_delete,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
@ -1178,7 +1178,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw._perform_amphora_failover(mock_amphora, 10)
mock_update.assert_called_with('TEST', AMP_ID, busy=True)
mock_delete.assert_called_with('TEST', amphora_id=AMP_ID)
mock_taskflow_load.assert_not_called()
@mock.patch('octavia.controller.worker.'

View File

@ -55,7 +55,7 @@ def generate(flow_list, output_directory):
current_tuple[2] == 'get_failover_flow'):
current_engine = engines.load(
get_flow_method(role=constants.ROLE_STANDALONE,
status=constants.AMPHORA_ALLOCATED))
load_balancer_id=None))
elif (current_tuple[1] == 'LoadBalancerFlows' and
current_tuple[2] == 'get_create_load_balancer_flow'):
current_engine = engines.load(