diff --git a/octavia/amphorae/drivers/driver_base.py b/octavia/amphorae/drivers/driver_base.py index 20ddde8470..c4ac3f6d6b 100644 --- a/octavia/amphorae/drivers/driver_base.py +++ b/octavia/amphorae/drivers/driver_base.py @@ -14,6 +14,9 @@ # under the License. import abc +from typing import Optional + +from octavia.db import models as db_models class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta): @@ -236,6 +239,19 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta): :type timeout_dict: dict """ + @abc.abstractmethod + def check(self, amphora: db_models.Amphora, + timeout_dict: Optional[dict] = None): + """Check connectivity to the amphora. + + :param amphora: The amphora to query. + :param timeout_dict: Dictionary of timeout values for calls to the + amphora. May contain: req_conn_timeout, + req_read_timeout, conn_max_retries, + conn_retry_interval + :raises TimeOutException: The amphora didn't reply + """ + class VRRPDriverMixin(object, metaclass=abc.ABCMeta): """Abstract mixin class for VRRP support in loadbalancer amphorae diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index 52b2b23b83..36d0c02257 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -111,6 +111,11 @@ class HaproxyAmphoraLoadBalancerDriver( return api_version + def check(self, amphora: db_models.Amphora, + timeout_dict: Optional[dict] = None): + """Check connectivity to the amphora.""" + self._populate_amphora_api_version(amphora, timeout_dict) + def update_amphora_listeners(self, loadbalancer, amphora, timeout_dict=None): """Update the amphora with a new configuration. @@ -579,15 +584,15 @@ class HaproxyAmphoraLoadBalancerDriver( req_read_timeout, conn_max_retries, conn_retry_interval :type timeout_dict: dict - :returns: None if not found, the interface name string if found. + :returns: the interface name string if found. + :raises octavia.amphorae.drivers.haproxy.exceptions.NotFound: + No interface found on the amphora + :raises TimeOutException: The amphora didn't reply """ - try: - self._populate_amphora_api_version(amphora, timeout_dict) - response_json = self.clients[amphora.api_version].get_interface( - amphora, ip_address, timeout_dict, log_error=False) - return response_json.get('interface', None) - except (exc.NotFound, driver_except.TimeOutException): - return None + self._populate_amphora_api_version(amphora, timeout_dict) + response_json = self.clients[amphora.api_version].get_interface( + amphora, ip_address, timeout_dict, log_error=False) + return response_json.get('interface', None) # Check a custom hostname diff --git a/octavia/amphorae/drivers/noop_driver/driver.py b/octavia/amphorae/drivers/noop_driver/driver.py index fd467a8586..f007fa8d17 100644 --- a/octavia/amphorae/drivers/noop_driver/driver.py +++ b/octavia/amphorae/drivers/noop_driver/driver.py @@ -196,3 +196,6 @@ class NoopAmphoraLoadBalancerDriver( def reload_vrrp_service(self, loadbalancer): pass + + def check(self, amphora, timeout_dict=None): + pass diff --git a/octavia/common/constants.py b/octavia/common/constants.py index f01b3e60b2..da566fe536 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -313,6 +313,7 @@ AMPHORA_INDEX = 'amphora_index' AMPHORA_NETWORK_CONFIG = 'amphora_network_config' AMPHORAE = 'amphorae' AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config' +AMPHORAE_STATUS = 'amphorae_status' AMPS_DATA = 'amps_data' ANTI_AFFINITY = 'anti-affinity' ATTEMPT_NUMBER = 'attempt_number' @@ -387,6 +388,7 @@ MESSAGE = 'message' NAME = 'name' NETWORK = 'network' NETWORK_ID = 'network_id' +NEW_AMPHORA_ID = 'new_amphora_id' NEXTHOP = 'nexthop' NICS = 'nics' OBJECT = 'object' @@ -435,6 +437,7 @@ TLS_CERTIFICATE_ID = 'tls_certificate_id' TLS_CONTAINER_ID = 'tls_container_id' TOPOLOGY = 'topology' TOTAL_CONNECTIONS = 'total_connections' +UNREACHABLE = 'unreachable' UPDATED_AT = 'updated_at' UPDATE_DICT = 'update_dict' UPDATED_PORTS = 'updated_ports' @@ -562,6 +565,7 @@ ADMIN_DOWN_PORT = 'admin-down-port' AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug' AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener' AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert' +AMPHORAE_GET_CONNECTIVITY_STATUS = 'amphorae-get-connectivity-status' AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug' ATTACH_PORT = 'attach-port' CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta' diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py index 54df1ff8f9..3f1a541d2f 100644 --- a/octavia/controller/worker/v2/flows/amphora_flows.py +++ b/octavia/controller/worker/v2/flows/amphora_flows.py @@ -226,7 +226,8 @@ class AmphoraFlows(object): return delete_amphora_flow def get_vrrp_subflow(self, prefix, timeout_dict=None, - create_vrrp_group=True): + create_vrrp_group=True, + get_amphorae_status=True): sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW vrrp_subflow = linear_flow.Flow(sf_name) @@ -242,6 +243,17 @@ class AmphoraFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE_NETWORK_CONFIG)) + if get_amphorae_status: + # Get the amphorae_status dict in case the caller hasn't fetched + # it yet. + vrrp_subflow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # VRRP update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -252,7 +264,7 @@ class AmphoraFlows(object): amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -261,13 +273,13 @@ class AmphoraFlows(object): name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-0-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) @@ -275,7 +287,7 @@ class AmphoraFlows(object): amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -284,12 +296,12 @@ class AmphoraFlows(object): name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-1-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -538,6 +550,14 @@ class AmphoraFlows(object): constants.CONN_RETRY_INTERVAL: CONF.haproxy_amphora.active_connection_retry_interval} + failover_amp_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -548,7 +568,8 @@ class AmphoraFlows(object): update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) @@ -558,7 +579,8 @@ class AmphoraFlows(object): if lb_amp_count == 2: failover_amp_flow.add( self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # Reload the listener. This needs to be done here because # it will create the required haproxy check scripts for @@ -574,7 +596,8 @@ class AmphoraFlows(object): amphora_driver_tasks.AmphoraIndexListenersReload( name=(str(amp_index) + '-' + constants.AMPHORA_RELOAD_LISTENER), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v2/flows/load_balancer_flows.py b/octavia/controller/worker/v2/flows/load_balancer_flows.py index df1a3a48ec..7e3626da34 100644 --- a/octavia/controller/worker/v2/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v2/flows/load_balancer_flows.py @@ -637,6 +637,14 @@ class LoadBalancerFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE)) + failover_LB_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=(new_amp_role + '-' + + constants.AMPHORAE_GET_CONNECTIVITY_STATUS), + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -651,14 +659,16 @@ class LoadBalancerFlows(object): amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-0-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-1-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -667,7 +677,8 @@ class LoadBalancerFlows(object): # Configure and enable keepalived in the amphora failover_LB_flow.add(self.amp_flows.get_vrrp_subflow( new_amp_role + '-' + constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # #### End of standby #### diff --git a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py index 0b752af09b..6235df9f4c 100644 --- a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py @@ -14,6 +14,9 @@ # import copy +from typing import List +from typing import Optional + from cryptography import fernet from oslo_config import cfg from oslo_log import log as logging @@ -102,10 +105,19 @@ class AmpListenersUpdate(BaseAmphoraTask): class AmphoraIndexListenerUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" - def execute(self, loadbalancer, amphora_index, amphorae, timeout_dict=()): + def execute(self, loadbalancer, amphora_index, amphorae, + amphorae_status: dict, timeout_dict=()): # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. + + amphora_id = amphorae[amphora_index].get(constants.ID) + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener update because amphora %s " + "is not reachable.", amphora_id) + return + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -120,7 +132,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): self.amphora_driver.update_amphora_listeners( db_lb, db_amp, timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index].get(constants.ID) LOG.error('Failed to update listeners on amphora %s. Skipping ' 'this amphora as it is failing to update due to: %s', amphora_id, str(e)) @@ -193,10 +204,18 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): """Task to reload all listeners on an amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - timeout_dict=None): + amphorae_status: dict, timeout_dict=None): """Execute listener reload routines for listeners on an amphora.""" if amphorae is None: return + + amphora_id = amphorae[amphora_index].get(constants.ID) + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener reload because amphora %s " + "is not reachable.", amphora_id) + return + # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups session = db_apis.get_session() @@ -210,7 +229,6 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): try: self.amphora_driver.reload(db_lb, db_amp, timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index][constants.ID] LOG.warning('Failed to reload listeners on amphora %s. ' 'Skipping this amphora as it is failing to ' 'reload due to: %s', amphora_id, str(e)) @@ -478,8 +496,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask): class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): """Task to get and update the VRRP interface device name from amphora.""" - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP interface update because amphora %s " + "is not reachable.", amphora_id) + return None + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -542,12 +567,19 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): """Task to update the VRRP configuration of an amphora.""" def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, - amphorae, amp_vrrp_int, timeout_dict=None): + amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], + timeout_dict=None): """Execute update_vrrp_conf.""" # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP configuration because amphora %s " + "is not reachable.", amphora_id) + return + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -594,10 +626,17 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): This will reload keepalived if it is already running. """ - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP start because amphora %s " + "is not reachable.", amphora_id) + return + session = db_apis.get_session() with session.begin(): db_amp = self.amphora_repo.get(session, id=amphora_id) @@ -669,3 +708,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask): 'update. Please update the amphora image for this ' 'amphora. Skipping.'. format(amphora.get(constants.ID))) + + +class AmphoraeGetConnectivityStatus(BaseAmphoraTask): + """Task that checks amphorae connectivity status. + + Check and return the connectivity status of both amphorae in ACTIVE STANDBY + load balancers + """ + + def execute(self, amphorae: List[dict], new_amphora_id: str, + timeout_dict=None): + amphorae_status = {} + + for amphora in amphorae: + amphora_id = amphora[constants.ID] + amphorae_status[amphora_id] = {} + + session = db_apis.get_session() + with session.begin(): + db_amp = self.amphora_repo.get(session, id=amphora_id) + + try: + # Verify if the amphora is reachable + self.amphora_driver.check(db_amp, timeout_dict=timeout_dict) + except Exception as e: + LOG.exception("Cannot get status for amphora %s", + amphora_id) + # In case it fails and the tested amphora is the newly created + # amphora, it's not a normal error handling, re-raise the + # exception + if amphora_id == new_amphora_id: + raise e + amphorae_status[amphora_id][constants.UNREACHABLE] = True + else: + amphorae_status[amphora_id][constants.UNREACHABLE] = False + + return amphorae_status diff --git a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py index 928446e96d..6cdd1360c0 100644 --- a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py +++ b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py @@ -76,9 +76,9 @@ class TestHAProxyAmphoraDriver(base.TestCase): mock_api_version.reset_mock() client_mock.reset_mock() - result = self.driver.get_interface_from_ip(amphora_mock, IP_ADDRESS) - - self.assertIsNone(result) + self.assertRaises( + exc.NotFound, + self.driver.get_interface_from_ip, amphora_mock, IP_ADDRESS) mock_api_version.assert_called_once_with(amphora_mock, None) client_mock.get_interface.assert_called_once_with( amphora_mock, IP_ADDRESS, None, log_error=False) diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py index 3591d09cf4..ab4a91d577 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py @@ -286,6 +286,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -296,7 +297,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(8, len(amp_flow.requires)) - self.assertEqual(13, len(amp_flow.provides)) + self.assertEqual(14, len(amp_flow.provides)) def test_get_failover_flow_standalone(self, mock_get_net_driver): failed_amphora = data_models.Amphora( @@ -320,6 +321,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -330,7 +332,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(8, len(amp_flow.requires)) - self.assertEqual(12, len(amp_flow.provides)) + self.assertEqual(13, len(amp_flow.provides)) def test_get_failover_flow_bogus_role(self, mock_get_net_driver): failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(), @@ -368,12 +370,30 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) + + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) + + def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver): + vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123', + get_amphorae_status=False) + + self.assertIsInstance(vrrp_subflow, flow.Flow) + + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) + self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + + self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_get_vrrp_subflow_dont_create_vrrp_group( self, mock_get_net_driver): @@ -384,12 +404,14 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) - self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_update_amphora_config_flow(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py index 9ec85a30c9..05388895fd 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py @@ -199,14 +199,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) - self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) + self.assertEqual(3, len(amp_flow.requires), amp_flow.requires) + self.assertEqual(5, len(amp_flow.provides), amp_flow.provides) amp_flow = self.LBFlow.get_post_lb_amp_association_flow( '123', constants.TOPOLOGY_ACTIVE_STANDBY) @@ -215,14 +217,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) - self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) + self.assertEqual(3, len(amp_flow.requires), amp_flow.requires) + self.assertEqual(5, len(amp_flow.provides), amp_flow.provides) @mock.patch('octavia.common.rpc.NOTIFIER', new_callable=MockNOTIFIER) @@ -285,6 +289,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.AMPHORA_ID, create_flow.provides) self.assertIn(constants.AMPHORA_NETWORK_CONFIG, create_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides) self.assertIn(constants.COMPUTE_ID, create_flow.provides) self.assertIn(constants.COMPUTE_OBJ, create_flow.provides) self.assertIn(constants.DELTAS, create_flow.provides) @@ -296,7 +301,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.ADDITIONAL_VIPS, create_flow.provides) self.assertEqual(6, len(create_flow.requires), create_flow.requires) - self.assertEqual(17, len(create_flow.provides), + self.assertEqual(18, len(create_flow.provides), create_flow.provides) def _test_get_failover_LB_flow_single(self, amphorae): diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py index f79af6c8bd..9d814277ae 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py @@ -132,17 +132,35 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_get.return_value = _db_amphora_mock mock_lb_get.return_value = _db_load_balancer_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], - self.timeout_dict) + amphorae_status, self.timeout_dict) mock_driver.update_amphora_listeners.assert_called_once_with( _db_load_balancer_mock, _db_amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], + amphorae_status, self.timeout_dict) + mock_driver.update_amphora_listeners.assert_not_called() + + # Test exception mock_driver.update_amphora_listeners.side_effect = Exception('boom') - amp_list_update_obj.execute(_LB_mock, 0, - [_amphora_mock], self.timeout_dict) + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {}, + self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) @@ -198,37 +216,54 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver, mock_generate_uuid, mock_log, mock_get_session, mock_listener_repo_get, mock_listener_repo_update, mock_amphora_repo_get, mock_amphora_repo_update): - amphora_mock = mock.MagicMock() listeners_reload_obj = ( amphora_driver_tasks.AmphoraIndexListenersReload()) mock_lb = mock.MagicMock() mock_listener = mock.MagicMock() mock_listener.id = '12345' - mock_amphora_repo_get.return_value = amphora_mock + mock_amphora_repo_get.return_value = _amphora_mock mock_lb_repo_get.return_value = mock_lb mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')] # Test no listeners mock_lb.listeners = None - listeners_reload_obj.execute(mock_lb, 0, None) + listeners_reload_obj.execute(mock_lb, 0, None, {}) mock_driver.reload.assert_not_called() # Test with listeners - mock_driver.start.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + mock_driver.reload.reset_mock() mock_lb.listeners = [mock_listener] - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, timeout_dict=self.timeout_dict) - mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + mock_driver.reload.reset_mock() + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.reload.assert_not_called() + # Test with reload exception mock_driver.reload.reset_mock() - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) - mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( - _session_mock, amphora_mock[constants.ID], + _session_mock, _amphora_mock[constants.ID], status=constants.ERROR) @mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @@ -827,6 +862,11 @@ class TestAmphoraDriverTasks(base.TestCase): FAKE_INTERFACE = 'fake0' mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE, Exception('boom')] + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} @@ -834,17 +874,28 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], amphorae_status, timeout_dict) mock_driver.get_interface_from_ip.assert_called_once_with( _db_amphora_mock, _db_amphora_mock.vrrp_ip, timeout_dict=timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, vrrp_interface=FAKE_INTERFACE) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amphora_update_vrrp_interface_obj.execute( + 0, [_amphora_mock], amphorae_status, timeout_dict) + mock_driver.get_interface_from_ip.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], {}, timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) @@ -895,20 +946,40 @@ class TestAmphoraDriverTasks(base.TestCase): Exception('boom')] mock_lb_get.return_value = _db_load_balancer_mock mock_amphora_repo_get.return_value = _db_amphora_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_update_obj = ( amphora_driver_tasks.AmphoraIndexVRRPUpdate()) amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0', + 0, [_amphora_mock], amphorae_status, + 'fakeint0', timeout_dict=self.timeout_dict) mock_driver.update_vrrp_conf.assert_called_once_with( _db_load_balancer_mock, amphorae_network_config, _db_amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + mock_amphora_repo_update.reset_mock() + mock_driver.update_vrrp_conf.reset_mock() + amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, + 0, [_amphora_mock], amphorae_status, + None) + mock_driver.update_vrrp_conf.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0') + 0, [_amphora_mock], {}, 'fakeint0') mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) @@ -939,19 +1010,36 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_get, mock_amphora_repo_update): mock_amphora_repo_get.return_value = _db_amphora_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_start_obj = ( amphora_driver_tasks.AmphoraIndexVRRPStart()) mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT, Exception('boom')] - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.start_vrrp_service.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.start_vrrp_service.assert_not_called() + # Test with a start exception mock_driver.start_vrrp_service.reset_mock() - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) @@ -1029,3 +1117,74 @@ class TestAmphoraDriverTasks(base.TestCase): self.assertRaises(driver_except.TimeOutException, amp_config_update_obj.execute, _amphora_mock, flavor) + + def test_amphorae_get_connectivity_status(self, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_get, + mock_amphora_repo_update): + amphora1_mock = mock.MagicMock() + amphora1_mock[constants.ID] = 'id1' + amphora2_mock = mock.MagicMock() + amphora2_mock[constants.ID] = 'id2' + db_amphora1_mock = mock.Mock() + db_amphora2_mock = mock.Mock() + + amp_get_connectivity_status = ( + amphora_driver_tasks.AmphoraeGetConnectivityStatus()) + + # All amphorae reachable + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.return_value = None + + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock[constants.ID]][constants.UNREACHABLE]) + self.assertFalse( + ret[amphora2_mock[constants.ID]][constants.UNREACHABLE]) + + # amphora1 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + driver_except.TimeOutException, None] + self.assertRaises(driver_except.TimeOutException, + amp_get_connectivity_status.execute, + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_called_with( + db_amphora1_mock, timeout_dict=self.timeout_dict) + + # amphora2 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + None, driver_except.TimeOutException] + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock[constants.ID]][constants.UNREACHABLE]) + self.assertTrue( + ret[amphora2_mock[constants.ID]][constants.UNREACHABLE]) diff --git a/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml b/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml new file mode 100644 index 0000000000..21d5718d6e --- /dev/null +++ b/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Reduce the duration of the failovers of ACTIVE_STANDBY load balancers. Many + updates of an unreachable amphora may have been attempted during a + failover, now if an amphora is not reachable at the first update, the other + updates are skipped.