Fix amphorae in ERROR during the failover

When 2 amps were down, the failover flow created the first one and
needed to update both amp to configure VRRP, but as the 2nd was missing,
it was set to ERROR. Then the health-manager could not trigger a
failover becasue amphorae in ERROR are excluded from the automated
failover process.

This commit changes the tasks that must be run on both amphorae during a
failover of one amphora, it doesn't mark the secondary amphora in ERROR
if it is not reachable.

Closes-Bug: #2033734

Note: stable/2023.1 and older, the patch also includes modifications in
      octavia/controller/worker/v1/

Conflicts:
	octavia/controller/worker/v2/tasks/amphora_driver_tasks.py

Change-Id: I4bd027346c61b93b537ab53810c2ecb6160b6be2
(cherry picked from commit 248cf2893e)
(cherry picked from commit 5622431534)
(cherry picked from commit 10cfd8c760)
This commit is contained in:
Gregory Thiemonge 2023-09-04 05:56:25 -04:00
parent dd849b4c5c
commit d09065c4f6
11 changed files with 221 additions and 48 deletions

View File

@ -279,6 +279,7 @@ class AmphoraFlows(object):
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT))
@ -288,12 +289,14 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-0-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
@ -302,6 +305,7 @@ class AmphoraFlows(object):
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT))
@ -311,11 +315,13 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-1-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
@ -583,6 +589,7 @@ class AmphoraFlows(object):
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict}))
@ -611,6 +618,7 @@ class AmphoraFlows(object):
constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -645,6 +645,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
update_amps_subflow.add(
@ -653,6 +654,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
@ -677,6 +679,7 @@ class LoadBalancerFlows(object):
name=(new_amp_role + '-' +
constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -75,7 +75,7 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora."""
def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=None):
amphorae_status: dict, new_amphora_id: str, timeout_dict=None):
# Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let
# health manager fix it.
@ -97,8 +97,11 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s',
amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
class ListenersUpdate(BaseAmphoraTask):
@ -138,7 +141,7 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
"""Task to reload all listeners on an amphora."""
def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=None):
amphorae_status: dict, new_amphora_id: str, timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora."""
if amphorae is None:
return
@ -158,8 +161,11 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
LOG.warning('Failed to reload listeners on amphora %s. '
'Skipping this amphora as it is failing to '
'reload due to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during
# the failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
class ListenerDelete(BaseAmphoraTask):
@ -324,7 +330,7 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
"""Task to get and update the VRRP interface device name from amphora."""
def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
new_amphora_id: str, timeout_dict=None):
amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
@ -341,8 +347,11 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
LOG.error('Failed to get amphora VRRP interface on amphora '
'%s. Skipping this amphora as it is failing due to: '
'%s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return None
self.amphora_repo.update(db_apis.get_session(), amphora_id,
@ -380,7 +389,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
timeout_dict=None):
new_amphora_id: str, timeout_dict=None):
"""Execute update_vrrp_conf."""
# Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let
@ -403,8 +412,11 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
LOG.error('Failed to update VRRP configuration amphora %s. '
'Skipping this amphora as it is failing to update due '
'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return
LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id)
@ -427,7 +439,7 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
"""
def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
new_amphora_id: str, timeout_dict=None):
amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
@ -442,8 +454,11 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
LOG.error('Failed to start VRRP on amphora %s. '
'Skipping this amphora as it is failing to start due '
'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return
LOG.debug("Started VRRP on amphora %s.", amphorae[amphora_index].id)

View File

@ -265,6 +265,7 @@ class AmphoraFlows(object):
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT))
@ -274,12 +275,14 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-0-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
@ -288,6 +291,7 @@ class AmphoraFlows(object):
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT))
@ -297,11 +301,13 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-1-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
@ -570,6 +576,7 @@ class AmphoraFlows(object):
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict}))
@ -598,6 +605,7 @@ class AmphoraFlows(object):
constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -661,6 +661,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
update_amps_subflow.add(
@ -669,6 +670,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
@ -693,6 +695,7 @@ class LoadBalancerFlows(object):
name=(new_amp_role + '-' +
constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -103,7 +103,7 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora."""
def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=()):
amphorae_status: dict, new_amphora_id: str, timeout_dict=()):
# Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let
# health manager fix it.
@ -130,8 +130,11 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s',
amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
class ListenersUpdate(BaseAmphoraTask):
@ -188,7 +191,7 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
"""Task to reload all listeners on an amphora."""
def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=None):
amphorae_status: dict, new_amphora_id: str, timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora."""
if amphorae is None:
return
@ -214,8 +217,11 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
LOG.warning('Failed to reload listeners on amphora %s. '
'Skipping this amphora as it is failing to '
'reload due to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during
# the failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
class ListenerDelete(BaseAmphoraTask):
@ -456,7 +462,7 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
"""Task to get and update the VRRP interface device name from amphora."""
def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
new_amphora_id: str, timeout_dict=None):
amphora_id = amphorae[amphora_index][constants.ID]
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
@ -476,8 +482,11 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
LOG.error('Failed to get amphora VRRP interface on amphora '
'%s. Skipping this amphora as it is failing due to: '
'%s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return None
self.amphora_repo.update(db_apis.get_session(), amphora_id,
@ -520,7 +529,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
timeout_dict=None):
new_amphora_id: str, timeout_dict=None):
"""Execute update_vrrp_conf."""
# Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let
@ -546,8 +555,11 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
LOG.error('Failed to update VRRP configuration amphora %s. '
'Skipping this amphora as it is failing to update due '
'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return
LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id)
@ -574,7 +586,7 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
"""
def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
new_amphora_id: str, timeout_dict=None):
# TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups
amphora_id = amphorae[amphora_index][constants.ID]
@ -591,8 +603,11 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
LOG.error('Failed to start VRRP on amphora %s. '
'Skipping this amphora as it is failing to start due '
'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return
LOG.debug("Started VRRP on amphora %s.",
amphorae[amphora_index][constants.ID])

View File

@ -346,10 +346,11 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides))
self.assertEqual(3, len(vrrp_subflow.requires))
self.assertEqual(4, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_create_vrrp_group(
self, mock_get_net_driver):

View File

@ -134,6 +134,7 @@ class TestAmphoraDriverTasks(base.TestCase):
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], amphorae_status,
_amphora_mock.id,
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with(
@ -147,7 +148,9 @@ class TestAmphoraDriverTasks(base.TestCase):
}
}
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict)
amphorae_status,
_amphora_mock.id,
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_not_called()
# Test exception
@ -155,11 +158,23 @@ class TestAmphoraDriverTasks(base.TestCase):
amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], {},
_amphora_mock.id,
self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)
# Test exception, secondary amp
mock_amphora_repo_update.reset_mock()
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], {},
'1234',
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
def test_listener_update(self,
mock_driver,
mock_generate_uuid,
@ -237,7 +252,7 @@ class TestAmphoraDriverTasks(base.TestCase):
# Test no listeners
mock_lb.listeners = None
listeners_reload_obj.execute(mock_lb, 0, None, {})
listeners_reload_obj.execute(mock_lb, 0, None, {}, amphora_mock.id)
mock_driver.reload.assert_not_called()
# Test with listeners
@ -250,6 +265,7 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_lb.listeners = [mock_listener]
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
amphorae_status,
amphora_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict)
@ -263,18 +279,30 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status,
_amphora_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_not_called()
# Test with reload exception
mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {},
amphora_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, amphora_mock.id, status=constants.ERROR)
# Test with reload exception, secondary amp
mock_driver.reload.reset_mock()
mock_amphora_repo_update.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
'1234',
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error')
def test_listeners_start(self,
@ -654,7 +682,8 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict)
0, [_amphora_mock], amphorae_status, _amphora_mock.id,
timeout_dict)
mock_driver.get_interface_from_ip.assert_called_once_with(
_amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
@ -668,16 +697,23 @@ class TestAmphoraDriverTasks(base.TestCase):
}
}
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict)
0, [_amphora_mock], amphorae_status, _amphora_mock.id,
timeout_dict)
mock_driver.get_interface_from_ip.assert_not_called()
# Test with an exception
mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, timeout_dict)
0, [_amphora_mock], {}, _amphora_mock.id, timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, '1234', timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_amphora_vrrp_update(self,
mock_lb_get,
@ -732,6 +768,7 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
0, [_amphora_mock], amphorae_status,
'fakeint0',
_amphora_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.update_vrrp_conf.assert_called_once_with(
_LB_mock, amphorae_network_config, _amphora_mock,
@ -747,17 +784,24 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.update_vrrp_conf.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status,
None)
None, _amphora_mock.id)
mock_driver.update_vrrp_conf.assert_not_called()
# Test with an exception
mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
0, [_amphora_mock], {},
'fakeint0')
0, [_amphora_mock], {}, 'fakeint0',
_amphora_mock.id)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], {}, 'fakeint0',
'1234')
mock_amphora_repo_update.assert_not_called()
def test_amphora_vrrp_start(self,
mock_driver,
mock_generate_uuid,
@ -793,6 +837,7 @@ class TestAmphoraDriverTasks(base.TestCase):
Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict)
@ -805,18 +850,29 @@ class TestAmphoraDriverTasks(base.TestCase):
}
}
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_not_called()
# Test with a start exception
mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
_amphora_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR)
# Test with a start exception, secondary amp
mock_driver.start_vrrp_service.reset_mock()
mock_amphora_repo_update.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, '1234',
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
def test_amphora_compute_connectivity_wait(self,
mock_driver,
mock_generate_uuid,

View File

@ -390,10 +390,11 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides))
self.assertEqual(3, len(vrrp_subflow.requires))
self.assertEqual(4, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_create_vrrp_group(
self, mock_get_net_driver):

View File

@ -140,7 +140,9 @@ class TestAmphoraDriverTasks(base.TestCase):
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict)
amphorae_status,
_amphora_mock[constants.ID],
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with(
_db_load_balancer_mock, _db_amphora_mock, self.timeout_dict)
@ -153,18 +155,31 @@ class TestAmphoraDriverTasks(base.TestCase):
}
}
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict)
amphorae_status,
_amphora_mock[constants.ID],
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_not_called()
# Test exception
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {},
_amphora_mock[constants.ID],
self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)
# Test exception, secondary amp
mock_amphora_repo_update.reset_mock()
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {},
'1234',
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_listeners_update(self,
mock_lb_get,
@ -223,7 +238,8 @@ class TestAmphoraDriverTasks(base.TestCase):
# Test no listeners
mock_lb.listeners = None
listeners_reload_obj.execute(mock_lb, 0, None, {})
listeners_reload_obj.execute(mock_lb, 0, None, {},
_amphora_mock[constants.ID])
mock_driver.reload.assert_not_called()
# Test with listeners
@ -236,6 +252,7 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_lb.listeners = [mock_listener]
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict)
@ -249,12 +266,14 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_not_called()
# Test with reload exception
mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict)
@ -262,6 +281,16 @@ class TestAmphoraDriverTasks(base.TestCase):
_session_mock, _amphora_mock[constants.ID],
status=constants.ERROR)
# Test with reload exception, secondary amp
mock_driver.reload.reset_mock()
mock_amphora_repo_update.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
'1234',
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error')
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
@ -851,7 +880,8 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict)
0, [_amphora_mock], amphorae_status, _amphora_mock[constants.ID],
timeout_dict)
mock_driver.get_interface_from_ip.assert_called_once_with(
_db_amphora_mock, _db_amphora_mock.vrrp_ip,
timeout_dict=timeout_dict)
@ -866,16 +896,23 @@ class TestAmphoraDriverTasks(base.TestCase):
}
}
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict)
0, [_amphora_mock], amphorae_status, _amphora_mock[constants.ID],
timeout_dict)
mock_driver.get_interface_from_ip.assert_not_called()
# Test with an exception
mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, timeout_dict)
0, [_amphora_mock], {}, _amphora_mock[constants.ID], timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, '1234', timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_amphora_vrrp_update(self,
mock_lb_get,
@ -935,6 +972,7 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status,
'fakeint0',
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.update_vrrp_conf.assert_called_once_with(
_db_load_balancer_mock, amphorae_network_config, _db_amphora_mock,
@ -950,16 +988,24 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.update_vrrp_conf.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status,
None)
None, _amphora_mock[constants.ID])
mock_driver.update_vrrp_conf.assert_not_called()
# Test with an exception
mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], {}, 'fakeint0')
0, [_amphora_mock], {}, 'fakeint0',
_amphora_mock[constants.ID])
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], {}, 'fakeint0',
'1234')
mock_amphora_repo_update.assert_not_called()
def test_amphora_vrrp_start(self,
mock_driver,
mock_generate_uuid,
@ -999,6 +1045,7 @@ class TestAmphoraDriverTasks(base.TestCase):
Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict)
@ -1011,18 +1058,29 @@ class TestAmphoraDriverTasks(base.TestCase):
}
}
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_not_called()
# Test with a start exception
mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
# Test with a start exception, secondary amp
mock_driver.start_vrrp_service.reset_mock()
mock_amphora_repo_update.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, '1234',
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
def test_amphora_compute_connectivity_wait(self,
mock_driver,
mock_generate_uuid,

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Reduce the duration of the failovers of ACTIVE_STANDBY load balancers when
both amphorae are unreachable.