diff --git a/octavia/controller/worker/v1/flows/amphora_flows.py b/octavia/controller/worker/v1/flows/amphora_flows.py index bc25359c09..b91a316dc1 100644 --- a/octavia/controller/worker/v1/flows/amphora_flows.py +++ b/octavia/controller/worker/v1/flows/amphora_flows.py @@ -279,6 +279,7 @@ class AmphoraFlows(object): amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -288,12 +289,14 @@ class AmphoraFlows(object): requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-0-' + constants.AMP_VRRP_START, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) @@ -302,6 +305,7 @@ class AmphoraFlows(object): amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -311,11 +315,13 @@ class AmphoraFlows(object): requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-1-' + constants.AMP_VRRP_START, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -583,6 +589,7 @@ class AmphoraFlows(object): name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) @@ -611,6 +618,7 @@ class AmphoraFlows(object): constants.AMPHORA_RELOAD_LISTENER), requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v1/flows/load_balancer_flows.py b/octavia/controller/worker/v1/flows/load_balancer_flows.py index 0512d39ef1..1d103ee1af 100644 --- a/octavia/controller/worker/v1/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v1/flows/load_balancer_flows.py @@ -645,6 +645,7 @@ class LoadBalancerFlows(object): constants.AMP_LISTENER_UPDATE), requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) update_amps_subflow.add( @@ -653,6 +654,7 @@ class LoadBalancerFlows(object): constants.AMP_LISTENER_UPDATE), requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -677,6 +679,7 @@ class LoadBalancerFlows(object): name=(new_amp_role + '-' + constants.AMPHORA_RELOAD_LISTENER), requires=(constants.LOADBALANCER, constants.AMPHORAE), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py index 9e62574310..7e746c3d93 100644 --- a/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py @@ -75,7 +75,7 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - amphorae_status: dict, timeout_dict=None): + amphorae_status: dict, new_amphora_id: str, timeout_dict=None): # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. @@ -97,8 +97,11 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): LOG.error('Failed to update listeners on amphora %s. Skipping ' 'this amphora as it is failing to update due to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) class ListenersUpdate(BaseAmphoraTask): @@ -138,7 +141,7 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): """Task to reload all listeners on an amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - amphorae_status: dict, timeout_dict=None): + amphorae_status: dict, new_amphora_id: str, timeout_dict=None): """Execute listener reload routines for listeners on an amphora.""" if amphorae is None: return @@ -158,8 +161,11 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): LOG.warning('Failed to reload listeners on amphora %s. ' 'Skipping this amphora as it is failing to ' 'reload due to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during + # the failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) class ListenerDelete(BaseAmphoraTask): @@ -324,7 +330,7 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): """Task to get and update the VRRP interface device name from amphora.""" def execute(self, amphora_index, amphorae, amphorae_status: dict, - timeout_dict=None): + new_amphora_id: str, timeout_dict=None): amphora_id = amphorae[amphora_index].id amphora_status = amphorae_status.get(amphora_id, {}) if amphora_status.get(constants.UNREACHABLE): @@ -341,8 +347,11 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): LOG.error('Failed to get amphora VRRP interface on amphora ' '%s. Skipping this amphora as it is failing due to: ' '%s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) return None self.amphora_repo.update(db_apis.get_session(), amphora_id, @@ -380,7 +389,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], - timeout_dict=None): + new_amphora_id: str, timeout_dict=None): """Execute update_vrrp_conf.""" # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let @@ -403,8 +412,11 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): LOG.error('Failed to update VRRP configuration amphora %s. ' 'Skipping this amphora as it is failing to update due ' 'to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) return LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id) @@ -427,7 +439,7 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): """ def execute(self, amphora_index, amphorae, amphorae_status: dict, - timeout_dict=None): + new_amphora_id: str, timeout_dict=None): amphora_id = amphorae[amphora_index].id amphora_status = amphorae_status.get(amphora_id, {}) if amphora_status.get(constants.UNREACHABLE): @@ -442,8 +454,11 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): LOG.error('Failed to start VRRP on amphora %s. ' 'Skipping this amphora as it is failing to start due ' 'to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) return LOG.debug("Started VRRP on amphora %s.", amphorae[amphora_index].id) diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py index 5a25461899..8cced35ed2 100644 --- a/octavia/controller/worker/v2/flows/amphora_flows.py +++ b/octavia/controller/worker/v2/flows/amphora_flows.py @@ -265,6 +265,7 @@ class AmphoraFlows(object): amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -274,12 +275,14 @@ class AmphoraFlows(object): requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-0-' + constants.AMP_VRRP_START, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) @@ -288,6 +291,7 @@ class AmphoraFlows(object): amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -297,11 +301,13 @@ class AmphoraFlows(object): requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-1-' + constants.AMP_VRRP_START, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -569,6 +575,7 @@ class AmphoraFlows(object): name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) @@ -597,6 +604,7 @@ class AmphoraFlows(object): constants.AMPHORA_RELOAD_LISTENER), requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v2/flows/load_balancer_flows.py b/octavia/controller/worker/v2/flows/load_balancer_flows.py index 892915820c..503c7bcaeb 100644 --- a/octavia/controller/worker/v2/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v2/flows/load_balancer_flows.py @@ -635,6 +635,7 @@ class LoadBalancerFlows(object): constants.AMP_LISTENER_UPDATE), requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) update_amps_subflow.add( @@ -643,6 +644,7 @@ class LoadBalancerFlows(object): constants.AMP_LISTENER_UPDATE), requires=(constants.LOADBALANCER, constants.AMPHORAE, constants.AMPHORAE_STATUS), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -667,6 +669,7 @@ class LoadBalancerFlows(object): name=(new_amp_role + '-' + constants.AMPHORA_RELOAD_LISTENER), requires=(constants.LOADBALANCER, constants.AMPHORAE), + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py index eb312e5a14..52be7eb2bb 100644 --- a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py @@ -103,7 +103,7 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - amphorae_status: dict, timeout_dict=()): + amphorae_status: dict, new_amphora_id: str, timeout_dict=()): # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. @@ -130,8 +130,11 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): LOG.error('Failed to update listeners on amphora %s. Skipping ' 'this amphora as it is failing to update due to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) class ListenersUpdate(BaseAmphoraTask): @@ -188,7 +191,7 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): """Task to reload all listeners on an amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - amphorae_status: dict, timeout_dict=None): + amphorae_status: dict, new_amphora_id: str, timeout_dict=None): """Execute listener reload routines for listeners on an amphora.""" if amphorae is None: return @@ -214,8 +217,11 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): LOG.warning('Failed to reload listeners on amphora %s. ' 'Skipping this amphora as it is failing to ' 'reload due to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during + # the failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) class ListenerDelete(BaseAmphoraTask): @@ -440,7 +446,7 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): """Task to get and update the VRRP interface device name from amphora.""" def execute(self, amphora_index, amphorae, amphorae_status: dict, - timeout_dict=None): + new_amphora_id: str, timeout_dict=None): amphora_id = amphorae[amphora_index][constants.ID] amphora_status = amphorae_status.get(amphora_id, {}) if amphora_status.get(constants.UNREACHABLE): @@ -460,8 +466,11 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): LOG.error('Failed to get amphora VRRP interface on amphora ' '%s. Skipping this amphora as it is failing due to: ' '%s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) return None self.amphora_repo.update(db_apis.get_session(), amphora_id, @@ -504,7 +513,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], - timeout_dict=None): + new_amphora_id: str, timeout_dict=None): """Execute update_vrrp_conf.""" # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let @@ -530,8 +539,11 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): LOG.error('Failed to update VRRP configuration amphora %s. ' 'Skipping this amphora as it is failing to update due ' 'to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) return LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id) @@ -558,7 +570,7 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): """ def execute(self, amphora_index, amphorae, amphorae_status: dict, - timeout_dict=None): + new_amphora_id: str, timeout_dict=None): # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups amphora_id = amphorae[amphora_index][constants.ID] @@ -575,8 +587,11 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): LOG.error('Failed to start VRRP on amphora %s. ' 'Skipping this amphora as it is failing to start due ' 'to: %s', amphora_id, str(e)) - self.amphora_repo.update(db_apis.get_session(), amphora_id, - status=constants.ERROR) + # Update only the status of the newly created amphora during the + # failover + if amphora_id == new_amphora_id: + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) return LOG.debug("Started VRRP on amphora %s.", amphorae[amphora_index][constants.ID]) diff --git a/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py index 77ed34fe89..403e17af1d 100644 --- a/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py @@ -346,10 +346,11 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(3, len(vrrp_subflow.requires)) + self.assertEqual(4, len(vrrp_subflow.requires)) def test_get_vrrp_subflow_dont_create_vrrp_group( self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py index 31ad29d61d..070e571e7e 100644 --- a/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py @@ -134,6 +134,7 @@ class TestAmphoraDriverTasks(base.TestCase): amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj.execute(_load_balancer_mock, 0, [_amphora_mock], amphorae_status, + _amphora_mock.id, self.timeout_dict) mock_driver.update_amphora_listeners.assert_called_once_with( @@ -147,7 +148,9 @@ class TestAmphoraDriverTasks(base.TestCase): } } amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], - amphorae_status, self.timeout_dict) + amphorae_status, + _amphora_mock.id, + self.timeout_dict) mock_driver.update_amphora_listeners.assert_not_called() # Test exception @@ -155,11 +158,23 @@ class TestAmphoraDriverTasks(base.TestCase): amp_list_update_obj.execute(_load_balancer_mock, 0, [_amphora_mock], {}, + _amphora_mock.id, self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) + # Test exception, secondary amp + mock_amphora_repo_update.reset_mock() + mock_driver.update_amphora_listeners.side_effect = Exception('boom') + + amp_list_update_obj.execute(_load_balancer_mock, 0, + [_amphora_mock], {}, + '1234', + self.timeout_dict) + + mock_amphora_repo_update.assert_not_called() + def test_listener_update(self, mock_driver, mock_generate_uuid, @@ -237,7 +252,7 @@ class TestAmphoraDriverTasks(base.TestCase): # Test no listeners mock_lb.listeners = None - listeners_reload_obj.execute(mock_lb, 0, None, {}) + listeners_reload_obj.execute(mock_lb, 0, None, {}, amphora_mock.id) mock_driver.reload.assert_not_called() # Test with listeners @@ -250,6 +265,7 @@ class TestAmphoraDriverTasks(base.TestCase): mock_lb.listeners = [mock_listener] listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], amphorae_status, + amphora_mock.id, timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, self.timeout_dict) @@ -263,18 +279,30 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver.reload.reset_mock() listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], amphorae_status, + _amphora_mock.id, timeout_dict=self.timeout_dict) mock_driver.reload.assert_not_called() # Test with reload exception mock_driver.reload.reset_mock() listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {}, + amphora_mock.id, timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, amphora_mock.id, status=constants.ERROR) + # Test with reload exception, secondary amp + mock_driver.reload.reset_mock() + mock_amphora_repo_update.reset_mock() + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {}, + '1234', + timeout_dict=self.timeout_dict) + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, + self.timeout_dict) + mock_amphora_repo_update.assert_not_called() + @mock.patch('octavia.controller.worker.task_utils.TaskUtils.' 'mark_listener_prov_status_error') def test_listeners_start(self, @@ -654,7 +682,8 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], amphorae_status, timeout_dict) + 0, [_amphora_mock], amphorae_status, _amphora_mock.id, + timeout_dict) mock_driver.get_interface_from_ip.assert_called_once_with( _amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict) mock_amphora_repo_update.assert_called_once_with( @@ -668,16 +697,23 @@ class TestAmphoraDriverTasks(base.TestCase): } } amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], amphorae_status, timeout_dict) + 0, [_amphora_mock], amphorae_status, _amphora_mock.id, + timeout_dict) mock_driver.get_interface_from_ip.assert_not_called() # Test with an exception mock_amphora_repo_update.reset_mock() amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], {}, timeout_dict) + 0, [_amphora_mock], {}, _amphora_mock.id, timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, status=constants.ERROR) + # Test with an exception, secondary amp + mock_amphora_repo_update.reset_mock() + amphora_update_vrrp_interface_obj.execute( + 0, [_amphora_mock], {}, '1234', timeout_dict) + mock_amphora_repo_update.assert_not_called() + @mock.patch('octavia.db.repositories.LoadBalancerRepository.get') def test_amphora_vrrp_update(self, mock_lb_get, @@ -732,6 +768,7 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, 0, [_amphora_mock], amphorae_status, 'fakeint0', + _amphora_mock.id, timeout_dict=self.timeout_dict) mock_driver.update_vrrp_conf.assert_called_once_with( _LB_mock, amphorae_network_config, _amphora_mock, @@ -747,17 +784,24 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver.update_vrrp_conf.reset_mock() amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, 0, [_amphora_mock], amphorae_status, - None) + None, _amphora_mock.id) mock_driver.update_vrrp_conf.assert_not_called() # Test with an exception mock_amphora_repo_update.reset_mock() amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, - 0, [_amphora_mock], {}, - 'fakeint0') + 0, [_amphora_mock], {}, 'fakeint0', + _amphora_mock.id) mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, status=constants.ERROR) + # Test with an exception, secondary amp + mock_amphora_repo_update.reset_mock() + amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, + 0, [_amphora_mock], {}, 'fakeint0', + '1234') + mock_amphora_repo_update.assert_not_called() + def test_amphora_vrrp_start(self, mock_driver, mock_generate_uuid, @@ -793,6 +837,7 @@ class TestAmphoraDriverTasks(base.TestCase): Exception('boom')] amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + _amphora_mock.id, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _amphora_mock, self.timeout_dict) @@ -805,18 +850,29 @@ class TestAmphoraDriverTasks(base.TestCase): } } amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + _amphora_mock.id, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_not_called() # Test with a start exception mock_driver.start_vrrp_service.reset_mock() amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, + _amphora_mock.id, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _amphora_mock, self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, status=constants.ERROR) + # Test with a start exception, secondary amp + mock_driver.start_vrrp_service.reset_mock() + mock_amphora_repo_update.reset_mock() + amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, '1234', + timeout_dict=self.timeout_dict) + mock_driver.start_vrrp_service.assert_called_once_with( + _amphora_mock, self.timeout_dict) + mock_amphora_repo_update.assert_not_called() + def test_amphora_compute_connectivity_wait(self, mock_driver, mock_generate_uuid, diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py index 3f5ebd0917..c1b5d951b1 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py @@ -390,10 +390,11 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(3, len(vrrp_subflow.requires)) + self.assertEqual(4, len(vrrp_subflow.requires)) def test_get_vrrp_subflow_dont_create_vrrp_group( self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py index d443fb5321..505eff7367 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py @@ -140,7 +140,9 @@ class TestAmphoraDriverTasks(base.TestCase): amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], - amphorae_status, self.timeout_dict) + amphorae_status, + _amphora_mock[constants.ID], + self.timeout_dict) mock_driver.update_amphora_listeners.assert_called_once_with( _db_load_balancer_mock, _db_amphora_mock, self.timeout_dict) @@ -153,18 +155,31 @@ class TestAmphoraDriverTasks(base.TestCase): } } amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], - amphorae_status, self.timeout_dict) + amphorae_status, + _amphora_mock[constants.ID], + self.timeout_dict) mock_driver.update_amphora_listeners.assert_not_called() # Test exception mock_driver.update_amphora_listeners.side_effect = Exception('boom') amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {}, + _amphora_mock[constants.ID], self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) + # Test exception, secondary amp + mock_amphora_repo_update.reset_mock() + mock_driver.update_amphora_listeners.side_effect = Exception('boom') + + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {}, + '1234', + self.timeout_dict) + + mock_amphora_repo_update.assert_not_called() + @mock.patch('octavia.db.repositories.LoadBalancerRepository.get') def test_listeners_update(self, mock_lb_get, @@ -223,7 +238,8 @@ class TestAmphoraDriverTasks(base.TestCase): # Test no listeners mock_lb.listeners = None - listeners_reload_obj.execute(mock_lb, 0, None, {}) + listeners_reload_obj.execute(mock_lb, 0, None, {}, + _amphora_mock[constants.ID]) mock_driver.reload.assert_not_called() # Test with listeners @@ -236,6 +252,7 @@ class TestAmphoraDriverTasks(base.TestCase): mock_lb.listeners = [mock_listener] listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], amphorae_status, + _amphora_mock[constants.ID], timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) @@ -249,12 +266,14 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver.reload.reset_mock() listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], amphorae_status, + _amphora_mock[constants.ID], timeout_dict=self.timeout_dict) mock_driver.reload.assert_not_called() # Test with reload exception mock_driver.reload.reset_mock() listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {}, + _amphora_mock[constants.ID], timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) @@ -262,6 +281,16 @@ class TestAmphoraDriverTasks(base.TestCase): _session_mock, _amphora_mock[constants.ID], status=constants.ERROR) + # Test with reload exception, secondary amp + mock_driver.reload.reset_mock() + mock_amphora_repo_update.reset_mock() + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {}, + '1234', + timeout_dict=self.timeout_dict) + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, + self.timeout_dict) + mock_amphora_repo_update.assert_not_called() + @mock.patch('octavia.controller.worker.task_utils.TaskUtils.' 'mark_listener_prov_status_error') @mock.patch('octavia.db.repositories.LoadBalancerRepository.get') @@ -775,7 +804,8 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], amphorae_status, timeout_dict) + 0, [_amphora_mock], amphorae_status, _amphora_mock[constants.ID], + timeout_dict) mock_driver.get_interface_from_ip.assert_called_once_with( _db_amphora_mock, _db_amphora_mock.vrrp_ip, timeout_dict=timeout_dict) @@ -790,16 +820,23 @@ class TestAmphoraDriverTasks(base.TestCase): } } amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], amphorae_status, timeout_dict) + 0, [_amphora_mock], amphorae_status, _amphora_mock[constants.ID], + timeout_dict) mock_driver.get_interface_from_ip.assert_not_called() # Test with an exception mock_amphora_repo_update.reset_mock() amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], {}, timeout_dict) + 0, [_amphora_mock], {}, _amphora_mock[constants.ID], timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) + # Test with an exception, secondary amp + mock_amphora_repo_update.reset_mock() + amphora_update_vrrp_interface_obj.execute( + 0, [_amphora_mock], {}, '1234', timeout_dict) + mock_amphora_repo_update.assert_not_called() + @mock.patch('octavia.db.repositories.LoadBalancerRepository.get') def test_amphora_vrrp_update(self, mock_lb_get, @@ -859,6 +896,7 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, 0, [_amphora_mock], amphorae_status, 'fakeint0', + _amphora_mock[constants.ID], timeout_dict=self.timeout_dict) mock_driver.update_vrrp_conf.assert_called_once_with( _db_load_balancer_mock, amphorae_network_config, _db_amphora_mock, @@ -874,16 +912,24 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver.update_vrrp_conf.reset_mock() amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, 0, [_amphora_mock], amphorae_status, - None) + None, _amphora_mock[constants.ID]) mock_driver.update_vrrp_conf.assert_not_called() # Test with an exception mock_amphora_repo_update.reset_mock() amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, - 0, [_amphora_mock], {}, 'fakeint0') + 0, [_amphora_mock], {}, 'fakeint0', + _amphora_mock[constants.ID]) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) + # Test with an exception, secondary amp + mock_amphora_repo_update.reset_mock() + amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, + 0, [_amphora_mock], {}, 'fakeint0', + '1234') + mock_amphora_repo_update.assert_not_called() + def test_amphora_vrrp_start(self, mock_driver, mock_generate_uuid, @@ -923,6 +969,7 @@ class TestAmphoraDriverTasks(base.TestCase): Exception('boom')] amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + _amphora_mock[constants.ID], timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) @@ -935,18 +982,29 @@ class TestAmphoraDriverTasks(base.TestCase): } } amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + _amphora_mock[constants.ID], timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_not_called() # Test with a start exception mock_driver.start_vrrp_service.reset_mock() amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, + _amphora_mock[constants.ID], timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) + # Test with a start exception, secondary amp + mock_driver.start_vrrp_service.reset_mock() + mock_amphora_repo_update.reset_mock() + amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, '1234', + timeout_dict=self.timeout_dict) + mock_driver.start_vrrp_service.assert_called_once_with( + _db_amphora_mock, self.timeout_dict) + mock_amphora_repo_update.assert_not_called() + def test_amphora_compute_connectivity_wait(self, mock_driver, mock_generate_uuid, diff --git a/releasenotes/notes/reduce-failover-duration-active-standby-amphora-in-error-3c1d75bc7d9b169f.yaml b/releasenotes/notes/reduce-failover-duration-active-standby-amphora-in-error-3c1d75bc7d9b169f.yaml new file mode 100644 index 0000000000..7c04516e99 --- /dev/null +++ b/releasenotes/notes/reduce-failover-duration-active-standby-amphora-in-error-3c1d75bc7d9b169f.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Reduce the duration of the failovers of ACTIVE_STANDBY load balancers when + both amphorae are unreachable.