diff --git a/octavia/amphorae/drivers/driver_base.py b/octavia/amphorae/drivers/driver_base.py index 8a4505e55c..6b83adf69d 100644 --- a/octavia/amphorae/drivers/driver_base.py +++ b/octavia/amphorae/drivers/driver_base.py @@ -14,6 +14,9 @@ # under the License. import abc +from typing import Optional + +from octavia.db import models as db_models class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta): @@ -231,6 +234,19 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta): :type timeout_dict: dict """ + @abc.abstractmethod + def check(self, amphora: db_models.Amphora, + timeout_dict: Optional[dict] = None): + """Check connectivity to the amphora. + + :param amphora: The amphora to query. + :param timeout_dict: Dictionary of timeout values for calls to the + amphora. May contain: req_conn_timeout, + req_read_timeout, conn_max_retries, + conn_retry_interval + :raises TimeOutException: The amphora didn't reply + """ + class VRRPDriverMixin(object, metaclass=abc.ABCMeta): """Abstract mixin class for VRRP support in loadbalancer amphorae diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index 234e769682..482694c7e2 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -17,6 +17,7 @@ import hashlib import os import ssl import time +from typing import Optional import warnings from oslo_context import context as oslo_context @@ -38,6 +39,7 @@ from octavia.common.jinja.lvs import jinja_cfg as jinja_udp_cfg from octavia.common.tls_utils import cert_parser from octavia.common import utils from octavia.db import api as db_apis +from octavia.db import models as db_models from octavia.db import repositories as repo from octavia.network import data_models as network_models @@ -115,6 +117,11 @@ class HaproxyAmphoraLoadBalancerDriver( amphora.id, amphora.api_version) return list(map(int, amphora.api_version.split('.'))) + def check(self, amphora: db_models.Amphora, + timeout_dict: Optional[dict] = None): + """Check connectivity to the amphora.""" + self._populate_amphora_api_version(amphora, timeout_dict) + def update_amphora_listeners(self, loadbalancer, amphora, timeout_dict=None): """Update the amphora with a new configuration. @@ -635,15 +642,15 @@ class HaproxyAmphoraLoadBalancerDriver( req_read_timeout, conn_max_retries, conn_retry_interval :type timeout_dict: dict - :returns: None if not found, the interface name string if found. + :returns: the interface name string if found. + :raises octavia.amphorae.drivers.haproxy.exceptions.NotFound: + No interface found on the amphora + :raises TimeOutException: The amphora didn't reply """ - try: - self._populate_amphora_api_version(amphora, timeout_dict) - response_json = self.clients[amphora.api_version].get_interface( - amphora, ip_address, timeout_dict, log_error=False) - return response_json.get('interface', None) - except (exc.NotFound, driver_except.TimeOutException): - return None + self._populate_amphora_api_version(amphora, timeout_dict) + response_json = self.clients[amphora.api_version].get_interface( + amphora, ip_address, timeout_dict, log_error=False) + return response_json.get('interface', None) # Check a custom hostname diff --git a/octavia/amphorae/drivers/noop_driver/driver.py b/octavia/amphorae/drivers/noop_driver/driver.py index 8d6bc52def..77b4bf04e0 100644 --- a/octavia/amphorae/drivers/noop_driver/driver.py +++ b/octavia/amphorae/drivers/noop_driver/driver.py @@ -195,3 +195,6 @@ class NoopAmphoraLoadBalancerDriver( def reload_vrrp_service(self, loadbalancer): pass + + def check(self, amphora, timeout_dict=None): + pass diff --git a/octavia/common/constants.py b/octavia/common/constants.py index f3a17d3d0b..2d303a010f 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -313,6 +313,7 @@ AMPHORA_INDEX = 'amphora_index' AMPHORA_NETWORK_CONFIG = 'amphora_network_config' AMPHORAE = 'amphorae' AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config' +AMPHORAE_STATUS = 'amphorae_status' AMPS_DATA = 'amps_data' ANTI_AFFINITY = 'anti-affinity' ATTEMPT_NUMBER = 'attempt_number' @@ -384,6 +385,7 @@ MESSAGE = 'message' NAME = 'name' NETWORK = 'network' NETWORK_ID = 'network_id' +NEW_AMPHORA_ID = 'new_amphora_id' NEXTHOP = 'nexthop' NICS = 'nics' OBJECT = 'object' @@ -430,6 +432,7 @@ TLS_CERTIFICATE_ID = 'tls_certificate_id' TLS_CONTAINER_ID = 'tls_container_id' TOPOLOGY = 'topology' TOTAL_CONNECTIONS = 'total_connections' +UNREACHABLE = 'unreachable' UPDATED_AT = 'updated_at' UPDATE_DICT = 'update_dict' UPDATED_PORTS = 'updated_ports' @@ -557,6 +560,7 @@ ADMIN_DOWN_PORT = 'admin-down-port' AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug' AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener' AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert' +AMPHORAE_GET_CONNECTIVITY_STATUS = 'amphorae-get-connectivity-status' AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug' ATTACH_PORT = 'attach-port' CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta' diff --git a/octavia/controller/worker/v1/flows/amphora_flows.py b/octavia/controller/worker/v1/flows/amphora_flows.py index ac8e37655d..aaaff1e6db 100644 --- a/octavia/controller/worker/v1/flows/amphora_flows.py +++ b/octavia/controller/worker/v1/flows/amphora_flows.py @@ -240,7 +240,8 @@ class AmphoraFlows(object): return delete_amphora_flow def get_vrrp_subflow(self, prefix, timeout_dict=None, - create_vrrp_group=True): + create_vrrp_group=True, + get_amphorae_status=True): sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW vrrp_subflow = linear_flow.Flow(sf_name) @@ -256,6 +257,17 @@ class AmphoraFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE_NETWORK_CONFIG)) + if get_amphorae_status: + # Get the amphorae_status dict in case the caller hasn't fetched + # it yet. + vrrp_subflow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # VRRP update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -266,7 +278,7 @@ class AmphoraFlows(object): amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -275,13 +287,13 @@ class AmphoraFlows(object): name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-0-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) @@ -289,7 +301,7 @@ class AmphoraFlows(object): amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -298,12 +310,12 @@ class AmphoraFlows(object): name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-1-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -551,6 +563,14 @@ class AmphoraFlows(object): constants.CONN_RETRY_INTERVAL: CONF.haproxy_amphora.active_connection_retry_interval} + failover_amp_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -561,7 +581,8 @@ class AmphoraFlows(object): update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) @@ -571,7 +592,8 @@ class AmphoraFlows(object): if lb_amp_count == 2: failover_amp_flow.add( self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # Reload the listener. This needs to be done here because # it will create the required haproxy check scripts for @@ -587,7 +609,8 @@ class AmphoraFlows(object): amphora_driver_tasks.AmphoraIndexListenersReload( name=(str(amp_index) + '-' + constants.AMPHORA_RELOAD_LISTENER), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v1/flows/load_balancer_flows.py b/octavia/controller/worker/v1/flows/load_balancer_flows.py index 4fb87e8972..8630ff0473 100644 --- a/octavia/controller/worker/v1/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v1/flows/load_balancer_flows.py @@ -621,6 +621,14 @@ class LoadBalancerFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE)) + failover_LB_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=(new_amp_role + '-' + + constants.AMPHORAE_GET_CONNECTIVITY_STATUS), + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -635,14 +643,16 @@ class LoadBalancerFlows(object): amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-0-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-1-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -651,7 +661,8 @@ class LoadBalancerFlows(object): # Configure and enable keepalived in the amphora failover_LB_flow.add(self.amp_flows.get_vrrp_subflow( new_amp_role + '-' + constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # #### End of standby #### diff --git a/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py index beb3497588..9e62574310 100644 --- a/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. # +from typing import List +from typing import Optional from cryptography import fernet from oslo_config import cfg @@ -73,10 +75,18 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - timeout_dict=None): + amphorae_status: dict, timeout_dict=None): # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. + + amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener update because amphora %s " + "is not reachable.", amphora_id) + return + try: # Make sure we have a fresh load balancer object loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), @@ -84,7 +94,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): self.amphora_driver.update_amphora_listeners( loadbalancer, amphorae[amphora_index], timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index].id LOG.error('Failed to update listeners on amphora %s. Skipping ' 'this amphora as it is failing to update due to: %s', amphora_id, str(e)) @@ -129,14 +138,23 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): """Task to reload all listeners on an amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - timeout_dict=None): + amphorae_status: dict, timeout_dict=None): """Execute listener reload routines for listeners on an amphora.""" + if amphorae is None: + return + + amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener reload because amphora %s " + "is not reachable.", amphora_id) + return + if loadbalancer.listeners: try: self.amphora_driver.reload( loadbalancer, amphorae[amphora_index], timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index].id LOG.warning('Failed to reload listeners on amphora %s. ' 'Skipping this amphora as it is failing to ' 'reload due to: %s', amphora_id, str(e)) @@ -305,8 +323,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask): class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): """Task to get and update the VRRP interface device name from amphora.""" - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP interface update because amphora %s " + "is not reachable.", amphora_id) + return None + try: interface = self.amphora_driver.get_interface_from_ip( amphorae[amphora_index], amphorae[amphora_index].vrrp_ip, @@ -354,14 +379,21 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): """Task to update the VRRP configuration of an amphora.""" def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, - amphorae, amp_vrrp_int, timeout_dict=None): + amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], + timeout_dict=None): """Execute update_vrrp_conf.""" - loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), - id=loadbalancer_id) # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP configuration because amphora %s " + "is not reachable.", amphora_id) + return + + loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), + id=loadbalancer_id) amphorae[amphora_index].vrrp_interface = amp_vrrp_int try: self.amphora_driver.update_vrrp_conf( @@ -394,8 +426,15 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): This will reload keepalived if it is already running. """ - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP start because amphora %s " + "is not reachable.", amphora_id) + return + try: self.amphora_driver.start_vrrp_service(amphorae[amphora_index], timeout_dict) @@ -451,3 +490,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask): LOG.error('Amphora %s does not support agent configuration ' 'update. Please update the amphora image for this ' 'amphora. Skipping.', amphora.id) + + +class AmphoraeGetConnectivityStatus(BaseAmphoraTask): + """Task that checks amphorae connectivity status. + + Check and return the connectivity status of both amphorae in ACTIVE STANDBY + load balancers + """ + + def execute(self, amphorae: List[dict], new_amphora_id: str, + timeout_dict=None): + amphorae_status = {} + + for amphora in amphorae: + amphora_id = amphora.id + amphorae_status[amphora_id] = {} + + session = db_apis.get_session() + with session.begin(): + db_amp = self.amphora_repo.get(session, id=amphora_id) + + try: + # Verify if the amphora is reachable + self.amphora_driver.check(db_amp, timeout_dict=timeout_dict) + except Exception as e: + LOG.exception("Cannot get status for amphora %s", + amphora_id) + # In case it fails and the tested amphora is the newly created + # amphora, it's not a normal error handling, re-raise the + # exception + if amphora_id == new_amphora_id: + raise e + amphorae_status[amphora_id][constants.UNREACHABLE] = True + else: + amphorae_status[amphora_id][constants.UNREACHABLE] = False + + return amphorae_status diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py index 2883c13866..237fc6d4bb 100644 --- a/octavia/controller/worker/v2/flows/amphora_flows.py +++ b/octavia/controller/worker/v2/flows/amphora_flows.py @@ -226,7 +226,8 @@ class AmphoraFlows(object): return delete_amphora_flow def get_vrrp_subflow(self, prefix, timeout_dict=None, - create_vrrp_group=True): + create_vrrp_group=True, + get_amphorae_status=True): sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW vrrp_subflow = linear_flow.Flow(sf_name) @@ -242,6 +243,17 @@ class AmphoraFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE_NETWORK_CONFIG)) + if get_amphorae_status: + # Get the amphorae_status dict in case the caller hasn't fetched + # it yet. + vrrp_subflow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # VRRP update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -252,7 +264,7 @@ class AmphoraFlows(object): amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -261,13 +273,13 @@ class AmphoraFlows(object): name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-0-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) @@ -275,7 +287,7 @@ class AmphoraFlows(object): amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -284,12 +296,12 @@ class AmphoraFlows(object): name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-1-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -537,6 +549,14 @@ class AmphoraFlows(object): constants.CONN_RETRY_INTERVAL: CONF.haproxy_amphora.active_connection_retry_interval} + failover_amp_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -547,7 +567,8 @@ class AmphoraFlows(object): update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) @@ -557,7 +578,8 @@ class AmphoraFlows(object): if lb_amp_count == 2: failover_amp_flow.add( self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # Reload the listener. This needs to be done here because # it will create the required haproxy check scripts for @@ -573,7 +595,8 @@ class AmphoraFlows(object): amphora_driver_tasks.AmphoraIndexListenersReload( name=(str(amp_index) + '-' + constants.AMPHORA_RELOAD_LISTENER), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v2/flows/load_balancer_flows.py b/octavia/controller/worker/v2/flows/load_balancer_flows.py index 8c6b48905f..93b0037247 100644 --- a/octavia/controller/worker/v2/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v2/flows/load_balancer_flows.py @@ -611,6 +611,14 @@ class LoadBalancerFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE)) + failover_LB_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=(new_amp_role + '-' + + constants.AMPHORAE_GET_CONNECTIVITY_STATUS), + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -625,14 +633,16 @@ class LoadBalancerFlows(object): amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-0-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-1-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -641,7 +651,8 @@ class LoadBalancerFlows(object): # Configure and enable keepalived in the amphora failover_LB_flow.add(self.amp_flows.get_vrrp_subflow( new_amp_role + '-' + constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # #### End of standby #### diff --git a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py index 26585b14ed..eb312e5a14 100644 --- a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py @@ -14,6 +14,9 @@ # import copy +from typing import List +from typing import Optional + from cryptography import fernet from oslo_config import cfg from oslo_log import log as logging @@ -99,10 +102,19 @@ class AmpListenersUpdate(BaseAmphoraTask): class AmphoraIndexListenerUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" - def execute(self, loadbalancer, amphora_index, amphorae, timeout_dict=()): + def execute(self, loadbalancer, amphora_index, amphorae, + amphorae_status: dict, timeout_dict=()): # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. + + amphora_id = amphorae[amphora_index].get(constants.ID) + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener update because amphora %s " + "is not reachable.", amphora_id) + return + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -115,7 +127,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): self.amphora_driver.update_amphora_listeners( db_lb, db_amp, timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index].get(constants.ID) LOG.error('Failed to update listeners on amphora %s. Skipping ' 'this amphora as it is failing to update due to: %s', amphora_id, str(e)) @@ -177,10 +188,18 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): """Task to reload all listeners on an amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - timeout_dict=None): + amphorae_status: dict, timeout_dict=None): """Execute listener reload routines for listeners on an amphora.""" if amphorae is None: return + + amphora_id = amphorae[amphora_index].get(constants.ID) + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener reload because amphora %s " + "is not reachable.", amphora_id) + return + # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups db_amp = self.amphora_repo.get( @@ -192,7 +211,6 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): try: self.amphora_driver.reload(db_lb, db_amp, timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index][constants.ID] LOG.warning('Failed to reload listeners on amphora %s. ' 'Skipping this amphora as it is failing to ' 'reload due to: %s', amphora_id, str(e)) @@ -421,8 +439,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask): class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): """Task to get and update the VRRP interface device name from amphora.""" - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP interface update because amphora %s " + "is not reachable.", amphora_id) + return None + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -478,12 +503,19 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): """Task to update the VRRP configuration of an amphora.""" def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, - amphorae, amp_vrrp_int, timeout_dict=None): + amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], + timeout_dict=None): """Execute update_vrrp_conf.""" # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP configuration because amphora %s " + "is not reachable.", amphora_id) + return + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -525,10 +557,17 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): This will reload keepalived if it is already running. """ - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP start because amphora %s " + "is not reachable.", amphora_id) + return + db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora_id) try: self.amphora_driver.start_vrrp_service(db_amp, timeout_dict) @@ -592,3 +631,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask): 'update. Please update the amphora image for this ' 'amphora. Skipping.'. format(amphora.get(constants.ID))) + + +class AmphoraeGetConnectivityStatus(BaseAmphoraTask): + """Task that checks amphorae connectivity status. + + Check and return the connectivity status of both amphorae in ACTIVE STANDBY + load balancers + """ + + def execute(self, amphorae: List[dict], new_amphora_id: str, + timeout_dict=None): + amphorae_status = {} + + for amphora in amphorae: + amphora_id = amphora[constants.ID] + amphorae_status[amphora_id] = {} + + session = db_apis.get_session() + with session.begin(): + db_amp = self.amphora_repo.get(session, id=amphora_id) + + try: + # Verify if the amphora is reachable + self.amphora_driver.check(db_amp, timeout_dict=timeout_dict) + except Exception as e: + LOG.exception("Cannot get status for amphora %s", + amphora_id) + # In case it fails and the tested amphora is the newly created + # amphora, it's not a normal error handling, re-raise the + # exception + if amphora_id == new_amphora_id: + raise e + amphorae_status[amphora_id][constants.UNREACHABLE] = True + else: + amphorae_status[amphora_id][constants.UNREACHABLE] = False + + return amphorae_status diff --git a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py index 8bef00ebea..428ee5aef6 100644 --- a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py +++ b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py @@ -75,9 +75,9 @@ class TestHAProxyAmphoraDriver(base.TestCase): mock_api_version.reset_mock() client_mock.reset_mock() - result = self.driver.get_interface_from_ip(amphora_mock, IP_ADDRESS) - - self.assertIsNone(result) + self.assertRaises( + exc.NotFound, + self.driver.get_interface_from_ip, amphora_mock, IP_ADDRESS) mock_api_version.assert_called_once_with(amphora_mock, None) client_mock.get_interface.assert_called_once_with( amphora_mock, IP_ADDRESS, None, log_error=False) diff --git a/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py index e9f61abf92..77ed34fe89 100644 --- a/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py @@ -243,6 +243,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -253,7 +254,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(13, len(amp_flow.provides)) + self.assertEqual(14, len(amp_flow.provides)) def test_get_failover_flow_standalone(self, mock_get_net_driver): failed_amphora = data_models.Amphora( @@ -276,6 +277,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -286,7 +288,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(12, len(amp_flow.provides)) + self.assertEqual(13, len(amp_flow.provides)) def test_get_failover_flow_bogus_role(self, mock_get_net_driver): failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(), @@ -324,12 +326,30 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) + + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) + + def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver): + vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123', + get_amphorae_status=False) + + self.assertIsInstance(vrrp_subflow, flow.Flow) + + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) + self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + + self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_get_vrrp_subflow_dont_create_vrrp_group( self, mock_get_net_driver): @@ -340,12 +360,14 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) - self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_get_post_map_lb_subflow(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py index e3ec1d9b5a..fd0c405f55 100644 --- a/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py @@ -155,10 +155,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) + + self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) + self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(4, len(amp_flow.provides)) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(5, len(amp_flow.provides)) + self.assertEqual(3, len(amp_flow.requires)) amp_flow = self.LBFlow.get_post_lb_amp_association_flow( '123', constants.TOPOLOGY_ACTIVE_STANDBY) @@ -167,10 +173,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) + + self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) + self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(4, len(amp_flow.provides)) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(5, len(amp_flow.provides)) + self.assertEqual(3, len(amp_flow.requires)) def test_get_create_load_balancer_flows_single_listeners( self, mock_get_net_driver): @@ -219,6 +231,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LISTENERS, create_flow.provides) self.assertIn(constants.AMPHORA, create_flow.provides) self.assertIn(constants.AMPHORA_ID, create_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides) self.assertIn(constants.COMPUTE_ID, create_flow.provides) self.assertIn(constants.COMPUTE_OBJ, create_flow.provides) self.assertIn(constants.LOADBALANCER, create_flow.provides) @@ -230,7 +243,7 @@ class TestLoadBalancerFlows(base.TestCase): create_flow.provides) self.assertEqual(6, len(create_flow.requires)) - self.assertEqual(16, len(create_flow.provides), + self.assertEqual(17, len(create_flow.provides), create_flow.provides) def _test_get_failover_LB_flow_single(self, amphorae): @@ -322,6 +335,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.AMPHORA, failover_flow.provides) self.assertIn(constants.AMPHORA_ID, failover_flow.provides) self.assertIn(constants.AMPHORAE, failover_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, failover_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, failover_flow.provides) self.assertIn(constants.BASE_PORT, failover_flow.provides) @@ -339,7 +353,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertEqual(6, len(failover_flow.requires), failover_flow.requires) - self.assertEqual(16, len(failover_flow.provides), + self.assertEqual(17, len(failover_flow.provides), failover_flow.provides) def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py index 678108448a..b795703302 100644 --- a/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py @@ -125,17 +125,37 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_update): mock_lb_repo_get.return_value = _LB_mock + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } + amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj.execute(_load_balancer_mock, 0, - [_amphora_mock], self.timeout_dict) + [_amphora_mock], amphorae_status, + self.timeout_dict) mock_driver.update_amphora_listeners.assert_called_once_with( _LB_mock, _amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], + amphorae_status, self.timeout_dict) + mock_driver.update_amphora_listeners.assert_not_called() + + # Test exception mock_driver.update_amphora_listeners.side_effect = Exception('boom') amp_list_update_obj.execute(_load_balancer_mock, 0, - [_amphora_mock], self.timeout_dict) + [_amphora_mock], {}, + self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) @@ -217,20 +237,38 @@ class TestAmphoraDriverTasks(base.TestCase): # Test no listeners mock_lb.listeners = None - listeners_reload_obj.execute(mock_lb, 0, None) + listeners_reload_obj.execute(mock_lb, 0, None, {}) mock_driver.reload.assert_not_called() # Test with listeners + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } mock_driver.start.reset_mock() mock_lb.listeners = [mock_listener] listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + amphorae_status, timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + mock_driver.reload.reset_mock() + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.reload.assert_not_called() + # Test with reload exception mock_driver.reload.reset_mock() - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {}, timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, self.timeout_dict) @@ -604,6 +642,11 @@ class TestAmphoraDriverTasks(base.TestCase): _LB_mock.amphorae = _amphorae_mock mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE, Exception('boom')] + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} @@ -611,16 +654,27 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], amphorae_status, timeout_dict) mock_driver.get_interface_from_ip.assert_called_once_with( _amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, vrrp_interface=FAKE_INTERFACE) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + amphora_update_vrrp_interface_obj.execute( + 0, [_amphora_mock], amphorae_status, timeout_dict) + mock_driver.get_interface_from_ip.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], {}, timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, status=constants.ERROR) @@ -666,20 +720,41 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver.update_vrrp_conf.side_effect = [mock.DEFAULT, Exception('boom')] mock_lb_get.return_value = _LB_mock + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_update_obj = ( amphora_driver_tasks.AmphoraIndexVRRPUpdate()) amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0', + 0, [_amphora_mock], amphorae_status, + 'fakeint0', timeout_dict=self.timeout_dict) mock_driver.update_vrrp_conf.assert_called_once_with( _LB_mock, amphorae_network_config, _amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + mock_amphora_repo_update.reset_mock() + mock_driver.update_vrrp_conf.reset_mock() + amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, + 0, [_amphora_mock], amphorae_status, + None) + mock_driver.update_vrrp_conf.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0') + 0, [_amphora_mock], {}, + 'fakeint0') mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, status=constants.ERROR) @@ -706,19 +781,36 @@ class TestAmphoraDriverTasks(base.TestCase): mock_listener_repo_get, mock_listener_repo_update, mock_amphora_repo_update): + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_start_obj = ( amphora_driver_tasks.AmphoraIndexVRRPStart()) mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT, Exception('boom')] - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.start_vrrp_service.reset_mock() + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.start_vrrp_service.assert_not_called() + # Test with a start exception mock_driver.start_vrrp_service.reset_mock() - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _amphora_mock, self.timeout_dict) @@ -790,3 +882,75 @@ class TestAmphoraDriverTasks(base.TestCase): self.assertRaises(driver_except.TimeOutException, amp_config_update_obj.execute, _amphora_mock, flavor) + + @mock.patch('octavia.db.repositories.AmphoraRepository.get') + def test_amphorae_get_connectivity_status(self, + mock_amphora_repo_get, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_update): + amphora1_mock = mock.MagicMock() + amphora1_mock.id = 'id1' + amphora2_mock = mock.MagicMock() + amphora2_mock.id = 'id2' + db_amphora1_mock = mock.Mock() + db_amphora2_mock = mock.Mock() + + amp_get_connectivity_status = ( + amphora_driver_tasks.AmphoraeGetConnectivityStatus()) + + # All amphorae reachable + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.return_value = None + + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock.id, + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock.id][constants.UNREACHABLE]) + self.assertFalse( + ret[amphora2_mock.id][constants.UNREACHABLE]) + + # amphora1 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + driver_except.TimeOutException, None] + self.assertRaises(driver_except.TimeOutException, + amp_get_connectivity_status.execute, + [amphora1_mock, amphora2_mock], + amphora1_mock.id, + timeout_dict=self.timeout_dict) + mock_driver.check.assert_called_with( + db_amphora1_mock, timeout_dict=self.timeout_dict) + + # amphora2 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + None, driver_except.TimeOutException] + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock.id, + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock.id][constants.UNREACHABLE]) + self.assertTrue( + ret[amphora2_mock.id][constants.UNREACHABLE]) diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py index feeeb17910..3f5ebd0917 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py @@ -286,6 +286,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -296,7 +297,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(13, len(amp_flow.provides)) + self.assertEqual(14, len(amp_flow.provides)) def test_get_failover_flow_standalone(self, mock_get_net_driver): failed_amphora = data_models.Amphora( @@ -320,6 +321,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -330,7 +332,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(12, len(amp_flow.provides)) + self.assertEqual(13, len(amp_flow.provides)) def test_get_failover_flow_bogus_role(self, mock_get_net_driver): failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(), @@ -368,12 +370,30 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) + + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) + + def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver): + vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123', + get_amphorae_status=False) + + self.assertIsInstance(vrrp_subflow, flow.Flow) + + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) + self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + + self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_get_vrrp_subflow_dont_create_vrrp_group( self, mock_get_net_driver): @@ -384,12 +404,14 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) - self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_update_amphora_config_flow(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py index ee343304c5..df7cec6d2f 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py @@ -174,14 +174,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) - self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) + self.assertEqual(3, len(amp_flow.requires), amp_flow.requires) + self.assertEqual(5, len(amp_flow.provides), amp_flow.provides) amp_flow = self.LBFlow.get_post_lb_amp_association_flow( '123', constants.TOPOLOGY_ACTIVE_STANDBY) @@ -190,14 +192,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) - self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) + self.assertEqual(3, len(amp_flow.requires), amp_flow.requires) + self.assertEqual(5, len(amp_flow.provides), amp_flow.provides) def test_get_create_load_balancer_flows_single_listeners( self, mock_get_net_driver): @@ -255,6 +259,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.AMPHORA_ID, create_flow.provides) self.assertIn(constants.AMPHORA_NETWORK_CONFIG, create_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides) self.assertIn(constants.COMPUTE_ID, create_flow.provides) self.assertIn(constants.COMPUTE_OBJ, create_flow.provides) self.assertIn(constants.DELTAS, create_flow.provides) @@ -265,7 +270,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.VIP, create_flow.provides) self.assertEqual(6, len(create_flow.requires), create_flow.requires) - self.assertEqual(16, len(create_flow.provides), + self.assertEqual(17, len(create_flow.provides), create_flow.provides) def _test_get_failover_LB_flow_single(self, amphorae): diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py index dd3dac3568..a04a127c84 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py @@ -132,17 +132,35 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_get.return_value = _db_amphora_mock mock_lb_get.return_value = _db_load_balancer_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], - self.timeout_dict) + amphorae_status, self.timeout_dict) mock_driver.update_amphora_listeners.assert_called_once_with( _db_load_balancer_mock, _db_amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], + amphorae_status, self.timeout_dict) + mock_driver.update_amphora_listeners.assert_not_called() + + # Test exception mock_driver.update_amphora_listeners.side_effect = Exception('boom') - amp_list_update_obj.execute(_LB_mock, 0, - [_amphora_mock], self.timeout_dict) + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {}, + self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) @@ -194,37 +212,54 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver, mock_generate_uuid, mock_log, mock_get_session, mock_listener_repo_get, mock_listener_repo_update, mock_amphora_repo_get, mock_amphora_repo_update): - amphora_mock = mock.MagicMock() listeners_reload_obj = ( amphora_driver_tasks.AmphoraIndexListenersReload()) mock_lb = mock.MagicMock() mock_listener = mock.MagicMock() mock_listener.id = '12345' - mock_amphora_repo_get.return_value = amphora_mock + mock_amphora_repo_get.return_value = _amphora_mock mock_lb_repo_get.return_value = mock_lb mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')] # Test no listeners mock_lb.listeners = None - listeners_reload_obj.execute(mock_lb, 0, None) + listeners_reload_obj.execute(mock_lb, 0, None, {}) mock_driver.reload.assert_not_called() # Test with listeners - mock_driver.start.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + mock_driver.reload.reset_mock() mock_lb.listeners = [mock_listener] - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, timeout_dict=self.timeout_dict) - mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + mock_driver.reload.reset_mock() + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.reload.assert_not_called() + # Test with reload exception mock_driver.reload.reset_mock() - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) - mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( - _session_mock, amphora_mock[constants.ID], + _session_mock, _amphora_mock[constants.ID], status=constants.ERROR) @mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @@ -728,6 +763,11 @@ class TestAmphoraDriverTasks(base.TestCase): FAKE_INTERFACE = 'fake0' mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE, Exception('boom')] + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} @@ -735,17 +775,28 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], amphorae_status, timeout_dict) mock_driver.get_interface_from_ip.assert_called_once_with( _db_amphora_mock, _db_amphora_mock.vrrp_ip, timeout_dict=timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, vrrp_interface=FAKE_INTERFACE) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amphora_update_vrrp_interface_obj.execute( + 0, [_amphora_mock], amphorae_status, timeout_dict) + mock_driver.get_interface_from_ip.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], {}, timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) @@ -796,20 +847,40 @@ class TestAmphoraDriverTasks(base.TestCase): Exception('boom')] mock_lb_get.return_value = _db_load_balancer_mock mock_amphora_repo_get.return_value = _db_amphora_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_update_obj = ( amphora_driver_tasks.AmphoraIndexVRRPUpdate()) amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0', + 0, [_amphora_mock], amphorae_status, + 'fakeint0', timeout_dict=self.timeout_dict) mock_driver.update_vrrp_conf.assert_called_once_with( _db_load_balancer_mock, amphorae_network_config, _db_amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + mock_amphora_repo_update.reset_mock() + mock_driver.update_vrrp_conf.reset_mock() + amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, + 0, [_amphora_mock], amphorae_status, + None) + mock_driver.update_vrrp_conf.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0') + 0, [_amphora_mock], {}, 'fakeint0') mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) @@ -840,19 +911,36 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_get, mock_amphora_repo_update): mock_amphora_repo_get.return_value = _db_amphora_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_start_obj = ( amphora_driver_tasks.AmphoraIndexVRRPStart()) mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT, Exception('boom')] - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.start_vrrp_service.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.start_vrrp_service.assert_not_called() + # Test with a start exception mock_driver.start_vrrp_service.reset_mock() - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) @@ -930,3 +1018,74 @@ class TestAmphoraDriverTasks(base.TestCase): self.assertRaises(driver_except.TimeOutException, amp_config_update_obj.execute, _amphora_mock, flavor) + + def test_amphorae_get_connectivity_status(self, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_get, + mock_amphora_repo_update): + amphora1_mock = mock.MagicMock() + amphora1_mock[constants.ID] = 'id1' + amphora2_mock = mock.MagicMock() + amphora2_mock[constants.ID] = 'id2' + db_amphora1_mock = mock.Mock() + db_amphora2_mock = mock.Mock() + + amp_get_connectivity_status = ( + amphora_driver_tasks.AmphoraeGetConnectivityStatus()) + + # All amphorae reachable + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.return_value = None + + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock[constants.ID]][constants.UNREACHABLE]) + self.assertFalse( + ret[amphora2_mock[constants.ID]][constants.UNREACHABLE]) + + # amphora1 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + driver_except.TimeOutException, None] + self.assertRaises(driver_except.TimeOutException, + amp_get_connectivity_status.execute, + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_called_with( + db_amphora1_mock, timeout_dict=self.timeout_dict) + + # amphora2 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + None, driver_except.TimeOutException] + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock[constants.ID]][constants.UNREACHABLE]) + self.assertTrue( + ret[amphora2_mock[constants.ID]][constants.UNREACHABLE]) diff --git a/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml b/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml new file mode 100644 index 0000000000..21d5718d6e --- /dev/null +++ b/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Reduce the duration of the failovers of ACTIVE_STANDBY load balancers. Many + updates of an unreachable amphora may have been attempted during a + failover, now if an amphora is not reachable at the first update, the other + updates are skipped.