From 68cb6e38800d03746236d0ed3e910658afe54323 Mon Sep 17 00:00:00 2001 From: Gregory Thiemonge Date: Fri, 1 Sep 2023 09:08:37 -0400 Subject: [PATCH] Reduce duration of failovers with amphora in ERROR In the failover flow, there are multiple tasks for the configuration of VRRP for the other amphorae of the load balancer, but during outage the other amps may not be available. To prevent the tasks from attempting connections to unreachable amphorae, we can detect in the first task that an amp is unreachable and pass this information to the other tasks. Those connection attempts could have taken a lot of time, between 15 min and 40 min depending on the configuration of Octavia and the provider driver (amphorav1 or amphorav2) Closes-Bug: #2033894 Note: stable/2023.1 and older, the patch also includes modifications in octavia/controller/worker/v1/ Conflicts: octavia/controller/worker/v2/tasks/amphora_driver_tasks.py Change-Id: Ib33a0b8d2875e4ff97c65933fe9360bb06994d32 (cherry picked from commit ca70587f0bcb4b2a05bd5c243f111140a5762114) (cherry picked from commit 6fdc2079da3392fdfbdf4a80df05bfe7d149ca95) (cherry picked from commit 97b1b8387e42f8361b612a0b81b1c60b6e14db37) (cherry picked from commit dd849b4c5c1a1d33e4fd4a9afa2f35903d4d9aac) --- octavia/amphorae/drivers/driver_base.py | 16 ++ .../drivers/haproxy/rest_api_driver.py | 23 ++- .../amphorae/drivers/noop_driver/driver.py | 3 + octavia/common/constants.py | 4 + .../worker/v1/flows/amphora_flows.py | 43 +++- .../worker/v1/flows/load_balancer_flows.py | 17 +- .../worker/v1/tasks/amphora_driver_tasks.py | 94 ++++++++- .../worker/v2/flows/amphora_flows.py | 43 +++- .../worker/v2/flows/load_balancer_flows.py | 17 +- .../worker/v2/tasks/amphora_driver_tasks.py | 90 +++++++- .../drivers/haproxy/test_rest_api_driver.py | 6 +- .../worker/v1/flows/test_amphora_flows.py | 32 ++- .../v1/flows/test_load_balancer_flows.py | 26 ++- .../v1/tasks/test_amphora_driver_tasks.py | 184 ++++++++++++++++- .../worker/v2/flows/test_amphora_flows.py | 32 ++- .../v2/flows/test_load_balancer_flows.py | 15 +- .../v2/tasks/test_amphora_driver_tasks.py | 195 ++++++++++++++++-- ...ce-duration-failover-636032433984d911.yaml | 7 + 18 files changed, 745 insertions(+), 102 deletions(-) create mode 100644 releasenotes/notes/reduce-duration-failover-636032433984d911.yaml diff --git a/octavia/amphorae/drivers/driver_base.py b/octavia/amphorae/drivers/driver_base.py index 8a4505e55c..6b83adf69d 100644 --- a/octavia/amphorae/drivers/driver_base.py +++ b/octavia/amphorae/drivers/driver_base.py @@ -14,6 +14,9 @@ # under the License. import abc +from typing import Optional + +from octavia.db import models as db_models class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta): @@ -231,6 +234,19 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta): :type timeout_dict: dict """ + @abc.abstractmethod + def check(self, amphora: db_models.Amphora, + timeout_dict: Optional[dict] = None): + """Check connectivity to the amphora. + + :param amphora: The amphora to query. + :param timeout_dict: Dictionary of timeout values for calls to the + amphora. May contain: req_conn_timeout, + req_read_timeout, conn_max_retries, + conn_retry_interval + :raises TimeOutException: The amphora didn't reply + """ + class VRRPDriverMixin(object, metaclass=abc.ABCMeta): """Abstract mixin class for VRRP support in loadbalancer amphorae diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index 234e769682..482694c7e2 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -17,6 +17,7 @@ import hashlib import os import ssl import time +from typing import Optional import warnings from oslo_context import context as oslo_context @@ -38,6 +39,7 @@ from octavia.common.jinja.lvs import jinja_cfg as jinja_udp_cfg from octavia.common.tls_utils import cert_parser from octavia.common import utils from octavia.db import api as db_apis +from octavia.db import models as db_models from octavia.db import repositories as repo from octavia.network import data_models as network_models @@ -115,6 +117,11 @@ class HaproxyAmphoraLoadBalancerDriver( amphora.id, amphora.api_version) return list(map(int, amphora.api_version.split('.'))) + def check(self, amphora: db_models.Amphora, + timeout_dict: Optional[dict] = None): + """Check connectivity to the amphora.""" + self._populate_amphora_api_version(amphora, timeout_dict) + def update_amphora_listeners(self, loadbalancer, amphora, timeout_dict=None): """Update the amphora with a new configuration. @@ -635,15 +642,15 @@ class HaproxyAmphoraLoadBalancerDriver( req_read_timeout, conn_max_retries, conn_retry_interval :type timeout_dict: dict - :returns: None if not found, the interface name string if found. + :returns: the interface name string if found. + :raises octavia.amphorae.drivers.haproxy.exceptions.NotFound: + No interface found on the amphora + :raises TimeOutException: The amphora didn't reply """ - try: - self._populate_amphora_api_version(amphora, timeout_dict) - response_json = self.clients[amphora.api_version].get_interface( - amphora, ip_address, timeout_dict, log_error=False) - return response_json.get('interface', None) - except (exc.NotFound, driver_except.TimeOutException): - return None + self._populate_amphora_api_version(amphora, timeout_dict) + response_json = self.clients[amphora.api_version].get_interface( + amphora, ip_address, timeout_dict, log_error=False) + return response_json.get('interface', None) # Check a custom hostname diff --git a/octavia/amphorae/drivers/noop_driver/driver.py b/octavia/amphorae/drivers/noop_driver/driver.py index 8d6bc52def..77b4bf04e0 100644 --- a/octavia/amphorae/drivers/noop_driver/driver.py +++ b/octavia/amphorae/drivers/noop_driver/driver.py @@ -195,3 +195,6 @@ class NoopAmphoraLoadBalancerDriver( def reload_vrrp_service(self, loadbalancer): pass + + def check(self, amphora, timeout_dict=None): + pass diff --git a/octavia/common/constants.py b/octavia/common/constants.py index f3a17d3d0b..2d303a010f 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -313,6 +313,7 @@ AMPHORA_INDEX = 'amphora_index' AMPHORA_NETWORK_CONFIG = 'amphora_network_config' AMPHORAE = 'amphorae' AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config' +AMPHORAE_STATUS = 'amphorae_status' AMPS_DATA = 'amps_data' ANTI_AFFINITY = 'anti-affinity' ATTEMPT_NUMBER = 'attempt_number' @@ -384,6 +385,7 @@ MESSAGE = 'message' NAME = 'name' NETWORK = 'network' NETWORK_ID = 'network_id' +NEW_AMPHORA_ID = 'new_amphora_id' NEXTHOP = 'nexthop' NICS = 'nics' OBJECT = 'object' @@ -430,6 +432,7 @@ TLS_CERTIFICATE_ID = 'tls_certificate_id' TLS_CONTAINER_ID = 'tls_container_id' TOPOLOGY = 'topology' TOTAL_CONNECTIONS = 'total_connections' +UNREACHABLE = 'unreachable' UPDATED_AT = 'updated_at' UPDATE_DICT = 'update_dict' UPDATED_PORTS = 'updated_ports' @@ -557,6 +560,7 @@ ADMIN_DOWN_PORT = 'admin-down-port' AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug' AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener' AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert' +AMPHORAE_GET_CONNECTIVITY_STATUS = 'amphorae-get-connectivity-status' AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug' ATTACH_PORT = 'attach-port' CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta' diff --git a/octavia/controller/worker/v1/flows/amphora_flows.py b/octavia/controller/worker/v1/flows/amphora_flows.py index ac8e37655d..aaaff1e6db 100644 --- a/octavia/controller/worker/v1/flows/amphora_flows.py +++ b/octavia/controller/worker/v1/flows/amphora_flows.py @@ -240,7 +240,8 @@ class AmphoraFlows(object): return delete_amphora_flow def get_vrrp_subflow(self, prefix, timeout_dict=None, - create_vrrp_group=True): + create_vrrp_group=True, + get_amphorae_status=True): sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW vrrp_subflow = linear_flow.Flow(sf_name) @@ -256,6 +257,17 @@ class AmphoraFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE_NETWORK_CONFIG)) + if get_amphorae_status: + # Get the amphorae_status dict in case the caller hasn't fetched + # it yet. + vrrp_subflow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # VRRP update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -266,7 +278,7 @@ class AmphoraFlows(object): amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -275,13 +287,13 @@ class AmphoraFlows(object): name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-0-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) @@ -289,7 +301,7 @@ class AmphoraFlows(object): amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -298,12 +310,12 @@ class AmphoraFlows(object): name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-1-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -551,6 +563,14 @@ class AmphoraFlows(object): constants.CONN_RETRY_INTERVAL: CONF.haproxy_amphora.active_connection_retry_interval} + failover_amp_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -561,7 +581,8 @@ class AmphoraFlows(object): update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) @@ -571,7 +592,8 @@ class AmphoraFlows(object): if lb_amp_count == 2: failover_amp_flow.add( self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # Reload the listener. This needs to be done here because # it will create the required haproxy check scripts for @@ -587,7 +609,8 @@ class AmphoraFlows(object): amphora_driver_tasks.AmphoraIndexListenersReload( name=(str(amp_index) + '-' + constants.AMPHORA_RELOAD_LISTENER), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v1/flows/load_balancer_flows.py b/octavia/controller/worker/v1/flows/load_balancer_flows.py index 4fb87e8972..8630ff0473 100644 --- a/octavia/controller/worker/v1/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v1/flows/load_balancer_flows.py @@ -621,6 +621,14 @@ class LoadBalancerFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE)) + failover_LB_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=(new_amp_role + '-' + + constants.AMPHORAE_GET_CONNECTIVITY_STATUS), + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -635,14 +643,16 @@ class LoadBalancerFlows(object): amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-0-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-1-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -651,7 +661,8 @@ class LoadBalancerFlows(object): # Configure and enable keepalived in the amphora failover_LB_flow.add(self.amp_flows.get_vrrp_subflow( new_amp_role + '-' + constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # #### End of standby #### diff --git a/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py index beb3497588..9e62574310 100644 --- a/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v1/tasks/amphora_driver_tasks.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. # +from typing import List +from typing import Optional from cryptography import fernet from oslo_config import cfg @@ -73,10 +75,18 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - timeout_dict=None): + amphorae_status: dict, timeout_dict=None): # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. + + amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener update because amphora %s " + "is not reachable.", amphora_id) + return + try: # Make sure we have a fresh load balancer object loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), @@ -84,7 +94,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): self.amphora_driver.update_amphora_listeners( loadbalancer, amphorae[amphora_index], timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index].id LOG.error('Failed to update listeners on amphora %s. Skipping ' 'this amphora as it is failing to update due to: %s', amphora_id, str(e)) @@ -129,14 +138,23 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): """Task to reload all listeners on an amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - timeout_dict=None): + amphorae_status: dict, timeout_dict=None): """Execute listener reload routines for listeners on an amphora.""" + if amphorae is None: + return + + amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener reload because amphora %s " + "is not reachable.", amphora_id) + return + if loadbalancer.listeners: try: self.amphora_driver.reload( loadbalancer, amphorae[amphora_index], timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index].id LOG.warning('Failed to reload listeners on amphora %s. ' 'Skipping this amphora as it is failing to ' 'reload due to: %s', amphora_id, str(e)) @@ -305,8 +323,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask): class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): """Task to get and update the VRRP interface device name from amphora.""" - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP interface update because amphora %s " + "is not reachable.", amphora_id) + return None + try: interface = self.amphora_driver.get_interface_from_ip( amphorae[amphora_index], amphorae[amphora_index].vrrp_ip, @@ -354,14 +379,21 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): """Task to update the VRRP configuration of an amphora.""" def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, - amphorae, amp_vrrp_int, timeout_dict=None): + amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], + timeout_dict=None): """Execute update_vrrp_conf.""" - loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), - id=loadbalancer_id) # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP configuration because amphora %s " + "is not reachable.", amphora_id) + return + + loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), + id=loadbalancer_id) amphorae[amphora_index].vrrp_interface = amp_vrrp_int try: self.amphora_driver.update_vrrp_conf( @@ -394,8 +426,15 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): This will reload keepalived if it is already running. """ - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): amphora_id = amphorae[amphora_index].id + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP start because amphora %s " + "is not reachable.", amphora_id) + return + try: self.amphora_driver.start_vrrp_service(amphorae[amphora_index], timeout_dict) @@ -451,3 +490,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask): LOG.error('Amphora %s does not support agent configuration ' 'update. Please update the amphora image for this ' 'amphora. Skipping.', amphora.id) + + +class AmphoraeGetConnectivityStatus(BaseAmphoraTask): + """Task that checks amphorae connectivity status. + + Check and return the connectivity status of both amphorae in ACTIVE STANDBY + load balancers + """ + + def execute(self, amphorae: List[dict], new_amphora_id: str, + timeout_dict=None): + amphorae_status = {} + + for amphora in amphorae: + amphora_id = amphora.id + amphorae_status[amphora_id] = {} + + session = db_apis.get_session() + with session.begin(): + db_amp = self.amphora_repo.get(session, id=amphora_id) + + try: + # Verify if the amphora is reachable + self.amphora_driver.check(db_amp, timeout_dict=timeout_dict) + except Exception as e: + LOG.exception("Cannot get status for amphora %s", + amphora_id) + # In case it fails and the tested amphora is the newly created + # amphora, it's not a normal error handling, re-raise the + # exception + if amphora_id == new_amphora_id: + raise e + amphorae_status[amphora_id][constants.UNREACHABLE] = True + else: + amphorae_status[amphora_id][constants.UNREACHABLE] = False + + return amphorae_status diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py index 2883c13866..237fc6d4bb 100644 --- a/octavia/controller/worker/v2/flows/amphora_flows.py +++ b/octavia/controller/worker/v2/flows/amphora_flows.py @@ -226,7 +226,8 @@ class AmphoraFlows(object): return delete_amphora_flow def get_vrrp_subflow(self, prefix, timeout_dict=None, - create_vrrp_group=True): + create_vrrp_group=True, + get_amphorae_status=True): sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW vrrp_subflow = linear_flow.Flow(sf_name) @@ -242,6 +243,17 @@ class AmphoraFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE_NETWORK_CONFIG)) + if get_amphorae_status: + # Get the amphorae_status dict in case the caller hasn't fetched + # it yet. + vrrp_subflow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # VRRP update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -252,7 +264,7 @@ class AmphoraFlows(object): amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -261,13 +273,13 @@ class AmphoraFlows(object): name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-0-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) @@ -275,7 +287,7 @@ class AmphoraFlows(object): amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict}, provides=constants.AMP_VRRP_INT)) @@ -284,12 +296,12 @@ class AmphoraFlows(object): name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE, requires=(constants.LOADBALANCER_ID, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, - constants.AMP_VRRP_INT), + constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( name=sf_name + '-1-' + constants.AMP_VRRP_START, - requires=constants.AMPHORAE, + requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -537,6 +549,14 @@ class AmphoraFlows(object): constants.CONN_RETRY_INTERVAL: CONF.haproxy_amphora.active_connection_retry_interval} + failover_amp_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS, + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + inject={constants.TIMEOUT_DICT: timeout_dict}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -547,7 +567,8 @@ class AmphoraFlows(object): update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) @@ -557,7 +578,8 @@ class AmphoraFlows(object): if lb_amp_count == 2: failover_amp_flow.add( self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # Reload the listener. This needs to be done here because # it will create the required haproxy check scripts for @@ -573,7 +595,8 @@ class AmphoraFlows(object): amphora_driver_tasks.AmphoraIndexListenersReload( name=(str(amp_index) + '-' + constants.AMPHORA_RELOAD_LISTENER), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: amp_index, constants.TIMEOUT_DICT: timeout_dict})) diff --git a/octavia/controller/worker/v2/flows/load_balancer_flows.py b/octavia/controller/worker/v2/flows/load_balancer_flows.py index 8c6b48905f..93b0037247 100644 --- a/octavia/controller/worker/v2/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v2/flows/load_balancer_flows.py @@ -611,6 +611,14 @@ class LoadBalancerFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.AMPHORAE)) + failover_LB_flow.add( + amphora_driver_tasks.AmphoraeGetConnectivityStatus( + name=(new_amp_role + '-' + + constants.AMPHORAE_GET_CONNECTIVITY_STATUS), + requires=constants.AMPHORAE, + rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID}, + provides=constants.AMPHORAE_STATUS)) + # Listeners update needs to be run on all amphora to update # their peer configurations. So parallelize this with an # unordered subflow. @@ -625,14 +633,16 @@ class LoadBalancerFlows(object): amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-0-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 0, constants.TIMEOUT_DICT: timeout_dict})) update_amps_subflow.add( amphora_driver_tasks.AmphoraIndexListenerUpdate( name=(constants.AMPHORA + '-1-' + constants.AMP_LISTENER_UPDATE), - requires=(constants.LOADBALANCER, constants.AMPHORAE), + requires=(constants.LOADBALANCER, constants.AMPHORAE, + constants.AMPHORAE_STATUS), inject={constants.AMPHORA_INDEX: 1, constants.TIMEOUT_DICT: timeout_dict})) @@ -641,7 +651,8 @@ class LoadBalancerFlows(object): # Configure and enable keepalived in the amphora failover_LB_flow.add(self.amp_flows.get_vrrp_subflow( new_amp_role + '-' + constants.GET_VRRP_SUBFLOW, - timeout_dict, create_vrrp_group=False)) + timeout_dict, create_vrrp_group=False, + get_amphorae_status=False)) # #### End of standby #### diff --git a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py index 26585b14ed..eb312e5a14 100644 --- a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py @@ -14,6 +14,9 @@ # import copy +from typing import List +from typing import Optional + from cryptography import fernet from oslo_config import cfg from oslo_log import log as logging @@ -99,10 +102,19 @@ class AmpListenersUpdate(BaseAmphoraTask): class AmphoraIndexListenerUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" - def execute(self, loadbalancer, amphora_index, amphorae, timeout_dict=()): + def execute(self, loadbalancer, amphora_index, amphorae, + amphorae_status: dict, timeout_dict=()): # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. + + amphora_id = amphorae[amphora_index].get(constants.ID) + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener update because amphora %s " + "is not reachable.", amphora_id) + return + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -115,7 +127,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask): self.amphora_driver.update_amphora_listeners( db_lb, db_amp, timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index].get(constants.ID) LOG.error('Failed to update listeners on amphora %s. Skipping ' 'this amphora as it is failing to update due to: %s', amphora_id, str(e)) @@ -177,10 +188,18 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): """Task to reload all listeners on an amphora.""" def execute(self, loadbalancer, amphora_index, amphorae, - timeout_dict=None): + amphorae_status: dict, timeout_dict=None): """Execute listener reload routines for listeners on an amphora.""" if amphorae is None: return + + amphora_id = amphorae[amphora_index].get(constants.ID) + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping listener reload because amphora %s " + "is not reachable.", amphora_id) + return + # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups db_amp = self.amphora_repo.get( @@ -192,7 +211,6 @@ class AmphoraIndexListenersReload(BaseAmphoraTask): try: self.amphora_driver.reload(db_lb, db_amp, timeout_dict) except Exception as e: - amphora_id = amphorae[amphora_index][constants.ID] LOG.warning('Failed to reload listeners on amphora %s. ' 'Skipping this amphora as it is failing to ' 'reload due to: %s', amphora_id, str(e)) @@ -421,8 +439,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask): class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): """Task to get and update the VRRP interface device name from amphora.""" - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP interface update because amphora %s " + "is not reachable.", amphora_id) + return None + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -478,12 +503,19 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask): """Task to update the VRRP configuration of an amphora.""" def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, - amphorae, amp_vrrp_int, timeout_dict=None): + amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], + timeout_dict=None): """Execute update_vrrp_conf.""" # Note, we don't want this to cause a revert as it may be used # in a failover flow with both amps failing. Skip it and let # health manager fix it. amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP configuration because amphora %s " + "is not reachable.", amphora_id) + return + try: # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups @@ -525,10 +557,17 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask): This will reload keepalived if it is already running. """ - def execute(self, amphora_index, amphorae, timeout_dict=None): + def execute(self, amphora_index, amphorae, amphorae_status: dict, + timeout_dict=None): # TODO(johnsom) Optimize this to use the dicts and not need the # DB lookups amphora_id = amphorae[amphora_index][constants.ID] + amphora_status = amphorae_status.get(amphora_id, {}) + if amphora_status.get(constants.UNREACHABLE): + LOG.warning("Skipping VRRP start because amphora %s " + "is not reachable.", amphora_id) + return + db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora_id) try: self.amphora_driver.start_vrrp_service(db_amp, timeout_dict) @@ -592,3 +631,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask): 'update. Please update the amphora image for this ' 'amphora. Skipping.'. format(amphora.get(constants.ID))) + + +class AmphoraeGetConnectivityStatus(BaseAmphoraTask): + """Task that checks amphorae connectivity status. + + Check and return the connectivity status of both amphorae in ACTIVE STANDBY + load balancers + """ + + def execute(self, amphorae: List[dict], new_amphora_id: str, + timeout_dict=None): + amphorae_status = {} + + for amphora in amphorae: + amphora_id = amphora[constants.ID] + amphorae_status[amphora_id] = {} + + session = db_apis.get_session() + with session.begin(): + db_amp = self.amphora_repo.get(session, id=amphora_id) + + try: + # Verify if the amphora is reachable + self.amphora_driver.check(db_amp, timeout_dict=timeout_dict) + except Exception as e: + LOG.exception("Cannot get status for amphora %s", + amphora_id) + # In case it fails and the tested amphora is the newly created + # amphora, it's not a normal error handling, re-raise the + # exception + if amphora_id == new_amphora_id: + raise e + amphorae_status[amphora_id][constants.UNREACHABLE] = True + else: + amphorae_status[amphora_id][constants.UNREACHABLE] = False + + return amphorae_status diff --git a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py index 8bef00ebea..428ee5aef6 100644 --- a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py +++ b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py @@ -75,9 +75,9 @@ class TestHAProxyAmphoraDriver(base.TestCase): mock_api_version.reset_mock() client_mock.reset_mock() - result = self.driver.get_interface_from_ip(amphora_mock, IP_ADDRESS) - - self.assertIsNone(result) + self.assertRaises( + exc.NotFound, + self.driver.get_interface_from_ip, amphora_mock, IP_ADDRESS) mock_api_version.assert_called_once_with(amphora_mock, None) client_mock.get_interface.assert_called_once_with( amphora_mock, IP_ADDRESS, None, log_error=False) diff --git a/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py index e9f61abf92..77ed34fe89 100644 --- a/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py @@ -243,6 +243,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -253,7 +254,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(13, len(amp_flow.provides)) + self.assertEqual(14, len(amp_flow.provides)) def test_get_failover_flow_standalone(self, mock_get_net_driver): failed_amphora = data_models.Amphora( @@ -276,6 +277,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -286,7 +288,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(12, len(amp_flow.provides)) + self.assertEqual(13, len(amp_flow.provides)) def test_get_failover_flow_bogus_role(self, mock_get_net_driver): failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(), @@ -324,12 +326,30 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) + + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) + + def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver): + vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123', + get_amphorae_status=False) + + self.assertIsInstance(vrrp_subflow, flow.Flow) + + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) + self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + + self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_get_vrrp_subflow_dont_create_vrrp_group( self, mock_get_net_driver): @@ -340,12 +360,14 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) - self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_get_post_map_lb_subflow(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py index e3ec1d9b5a..fd0c405f55 100644 --- a/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py @@ -155,10 +155,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) + + self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) + self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(4, len(amp_flow.provides)) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(5, len(amp_flow.provides)) + self.assertEqual(3, len(amp_flow.requires)) amp_flow = self.LBFlow.get_post_lb_amp_association_flow( '123', constants.TOPOLOGY_ACTIVE_STANDBY) @@ -167,10 +173,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) + + self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) + self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(4, len(amp_flow.provides)) - self.assertEqual(2, len(amp_flow.requires)) + self.assertEqual(5, len(amp_flow.provides)) + self.assertEqual(3, len(amp_flow.requires)) def test_get_create_load_balancer_flows_single_listeners( self, mock_get_net_driver): @@ -219,6 +231,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LISTENERS, create_flow.provides) self.assertIn(constants.AMPHORA, create_flow.provides) self.assertIn(constants.AMPHORA_ID, create_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides) self.assertIn(constants.COMPUTE_ID, create_flow.provides) self.assertIn(constants.COMPUTE_OBJ, create_flow.provides) self.assertIn(constants.LOADBALANCER, create_flow.provides) @@ -230,7 +243,7 @@ class TestLoadBalancerFlows(base.TestCase): create_flow.provides) self.assertEqual(6, len(create_flow.requires)) - self.assertEqual(16, len(create_flow.provides), + self.assertEqual(17, len(create_flow.provides), create_flow.provides) def _test_get_failover_LB_flow_single(self, amphorae): @@ -322,6 +335,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.AMPHORA, failover_flow.provides) self.assertIn(constants.AMPHORA_ID, failover_flow.provides) self.assertIn(constants.AMPHORAE, failover_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, failover_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, failover_flow.provides) self.assertIn(constants.BASE_PORT, failover_flow.provides) @@ -339,7 +353,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertEqual(6, len(failover_flow.requires), failover_flow.requires) - self.assertEqual(16, len(failover_flow.provides), + self.assertEqual(17, len(failover_flow.provides), failover_flow.provides) def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py index 678108448a..b795703302 100644 --- a/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py @@ -125,17 +125,37 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_update): mock_lb_repo_get.return_value = _LB_mock + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } + amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj.execute(_load_balancer_mock, 0, - [_amphora_mock], self.timeout_dict) + [_amphora_mock], amphorae_status, + self.timeout_dict) mock_driver.update_amphora_listeners.assert_called_once_with( _LB_mock, _amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], + amphorae_status, self.timeout_dict) + mock_driver.update_amphora_listeners.assert_not_called() + + # Test exception mock_driver.update_amphora_listeners.side_effect = Exception('boom') amp_list_update_obj.execute(_load_balancer_mock, 0, - [_amphora_mock], self.timeout_dict) + [_amphora_mock], {}, + self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) @@ -217,20 +237,38 @@ class TestAmphoraDriverTasks(base.TestCase): # Test no listeners mock_lb.listeners = None - listeners_reload_obj.execute(mock_lb, 0, None) + listeners_reload_obj.execute(mock_lb, 0, None, {}) mock_driver.reload.assert_not_called() # Test with listeners + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } mock_driver.start.reset_mock() mock_lb.listeners = [mock_listener] listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + amphorae_status, timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + mock_driver.reload.reset_mock() + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.reload.assert_not_called() + # Test with reload exception mock_driver.reload.reset_mock() - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {}, timeout_dict=self.timeout_dict) mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, self.timeout_dict) @@ -604,6 +642,11 @@ class TestAmphoraDriverTasks(base.TestCase): _LB_mock.amphorae = _amphorae_mock mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE, Exception('boom')] + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} @@ -611,16 +654,27 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], amphorae_status, timeout_dict) mock_driver.get_interface_from_ip.assert_called_once_with( _amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, vrrp_interface=FAKE_INTERFACE) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + amphora_update_vrrp_interface_obj.execute( + 0, [_amphora_mock], amphorae_status, timeout_dict) + mock_driver.get_interface_from_ip.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], {}, timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, status=constants.ERROR) @@ -666,20 +720,41 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver.update_vrrp_conf.side_effect = [mock.DEFAULT, Exception('boom')] mock_lb_get.return_value = _LB_mock + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_update_obj = ( amphora_driver_tasks.AmphoraIndexVRRPUpdate()) amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0', + 0, [_amphora_mock], amphorae_status, + 'fakeint0', timeout_dict=self.timeout_dict) mock_driver.update_vrrp_conf.assert_called_once_with( _LB_mock, amphorae_network_config, _amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + mock_amphora_repo_update.reset_mock() + mock_driver.update_vrrp_conf.reset_mock() + amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, + 0, [_amphora_mock], amphorae_status, + None) + mock_driver.update_vrrp_conf.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0') + 0, [_amphora_mock], {}, + 'fakeint0') mock_amphora_repo_update.assert_called_once_with( _session_mock, _amphora_mock.id, status=constants.ERROR) @@ -706,19 +781,36 @@ class TestAmphoraDriverTasks(base.TestCase): mock_listener_repo_get, mock_listener_repo_update, mock_amphora_repo_update): + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_start_obj = ( amphora_driver_tasks.AmphoraIndexVRRPStart()) mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT, Exception('boom')] - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.start_vrrp_service.reset_mock() + amphorae_status = { + _amphora_mock.id: { + constants.UNREACHABLE: True + } + } + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.start_vrrp_service.assert_not_called() + # Test with a start exception mock_driver.start_vrrp_service.reset_mock() - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _amphora_mock, self.timeout_dict) @@ -790,3 +882,75 @@ class TestAmphoraDriverTasks(base.TestCase): self.assertRaises(driver_except.TimeOutException, amp_config_update_obj.execute, _amphora_mock, flavor) + + @mock.patch('octavia.db.repositories.AmphoraRepository.get') + def test_amphorae_get_connectivity_status(self, + mock_amphora_repo_get, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_update): + amphora1_mock = mock.MagicMock() + amphora1_mock.id = 'id1' + amphora2_mock = mock.MagicMock() + amphora2_mock.id = 'id2' + db_amphora1_mock = mock.Mock() + db_amphora2_mock = mock.Mock() + + amp_get_connectivity_status = ( + amphora_driver_tasks.AmphoraeGetConnectivityStatus()) + + # All amphorae reachable + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.return_value = None + + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock.id, + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock.id][constants.UNREACHABLE]) + self.assertFalse( + ret[amphora2_mock.id][constants.UNREACHABLE]) + + # amphora1 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + driver_except.TimeOutException, None] + self.assertRaises(driver_except.TimeOutException, + amp_get_connectivity_status.execute, + [amphora1_mock, amphora2_mock], + amphora1_mock.id, + timeout_dict=self.timeout_dict) + mock_driver.check.assert_called_with( + db_amphora1_mock, timeout_dict=self.timeout_dict) + + # amphora2 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + None, driver_except.TimeOutException] + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock.id, + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock.id][constants.UNREACHABLE]) + self.assertTrue( + ret[amphora2_mock.id][constants.UNREACHABLE]) diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py index feeeb17910..3f5ebd0917 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py @@ -286,6 +286,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -296,7 +297,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(13, len(amp_flow.provides)) + self.assertEqual(14, len(amp_flow.provides)) def test_get_failover_flow_standalone(self, mock_get_net_driver): failed_amphora = data_models.Amphora( @@ -320,6 +321,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides) @@ -330,7 +332,7 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertEqual(7, len(amp_flow.requires)) - self.assertEqual(12, len(amp_flow.provides)) + self.assertEqual(13, len(amp_flow.provides)) def test_get_failover_flow_bogus_role(self, mock_get_net_driver): failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(), @@ -368,12 +370,30 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) + + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) + + def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver): + vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123', + get_amphorae_status=False) + + self.assertIsInstance(vrrp_subflow, flow.Flow) + + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) + self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + + self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_get_vrrp_subflow_dont_create_vrrp_group( self, mock_get_net_driver): @@ -384,12 +404,14 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) + self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) + self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires) - self.assertEqual(2, len(vrrp_subflow.provides)) - self.assertEqual(2, len(vrrp_subflow.requires)) + self.assertEqual(3, len(vrrp_subflow.provides)) + self.assertEqual(3, len(vrrp_subflow.requires)) def test_update_amphora_config_flow(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py index ee343304c5..df7cec6d2f 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py @@ -174,14 +174,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) - self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) + self.assertEqual(3, len(amp_flow.requires), amp_flow.requires) + self.assertEqual(5, len(amp_flow.provides), amp_flow.provides) amp_flow = self.LBFlow.get_post_lb_amp_association_flow( '123', constants.TOPOLOGY_ACTIVE_STANDBY) @@ -190,14 +192,16 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORAE, amp_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides) - self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) - self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) + self.assertEqual(3, len(amp_flow.requires), amp_flow.requires) + self.assertEqual(5, len(amp_flow.provides), amp_flow.provides) def test_get_create_load_balancer_flows_single_listeners( self, mock_get_net_driver): @@ -255,6 +259,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.AMPHORA_ID, create_flow.provides) self.assertIn(constants.AMPHORA_NETWORK_CONFIG, create_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides) + self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides) self.assertIn(constants.COMPUTE_ID, create_flow.provides) self.assertIn(constants.COMPUTE_OBJ, create_flow.provides) self.assertIn(constants.DELTAS, create_flow.provides) @@ -265,7 +270,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.VIP, create_flow.provides) self.assertEqual(6, len(create_flow.requires), create_flow.requires) - self.assertEqual(16, len(create_flow.provides), + self.assertEqual(17, len(create_flow.provides), create_flow.provides) def _test_get_failover_LB_flow_single(self, amphorae): diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py index dd3dac3568..a04a127c84 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py @@ -132,17 +132,35 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_get.return_value = _db_amphora_mock mock_lb_get.return_value = _db_load_balancer_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], - self.timeout_dict) + amphorae_status, self.timeout_dict) mock_driver.update_amphora_listeners.assert_called_once_with( _db_load_balancer_mock, _db_amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], + amphorae_status, self.timeout_dict) + mock_driver.update_amphora_listeners.assert_not_called() + + # Test exception mock_driver.update_amphora_listeners.side_effect = Exception('boom') - amp_list_update_obj.execute(_LB_mock, 0, - [_amphora_mock], self.timeout_dict) + amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {}, + self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) @@ -194,37 +212,54 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver, mock_generate_uuid, mock_log, mock_get_session, mock_listener_repo_get, mock_listener_repo_update, mock_amphora_repo_get, mock_amphora_repo_update): - amphora_mock = mock.MagicMock() listeners_reload_obj = ( amphora_driver_tasks.AmphoraIndexListenersReload()) mock_lb = mock.MagicMock() mock_listener = mock.MagicMock() mock_listener.id = '12345' - mock_amphora_repo_get.return_value = amphora_mock + mock_amphora_repo_get.return_value = _amphora_mock mock_lb_repo_get.return_value = mock_lb mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')] # Test no listeners mock_lb.listeners = None - listeners_reload_obj.execute(mock_lb, 0, None) + listeners_reload_obj.execute(mock_lb, 0, None, {}) mock_driver.reload.assert_not_called() # Test with listeners - mock_driver.start.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + mock_driver.reload.reset_mock() mock_lb.listeners = [mock_listener] - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, timeout_dict=self.timeout_dict) - mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + mock_driver.reload.reset_mock() + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], + amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.reload.assert_not_called() + # Test with reload exception mock_driver.reload.reset_mock() - listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], + listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) - mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, + mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, self.timeout_dict) mock_amphora_repo_update.assert_called_once_with( - _session_mock, amphora_mock[constants.ID], + _session_mock, _amphora_mock[constants.ID], status=constants.ERROR) @mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @@ -728,6 +763,11 @@ class TestAmphoraDriverTasks(base.TestCase): FAKE_INTERFACE = 'fake0' mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE, Exception('boom')] + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} @@ -735,17 +775,28 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], amphorae_status, timeout_dict) mock_driver.get_interface_from_ip.assert_called_once_with( _db_amphora_mock, _db_amphora_mock.vrrp_ip, timeout_dict=timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, vrrp_interface=FAKE_INTERFACE) + # Unreachable amp + mock_driver.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amphora_update_vrrp_interface_obj.execute( + 0, [_amphora_mock], amphorae_status, timeout_dict) + mock_driver.get_interface_from_ip.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_update_vrrp_interface_obj.execute( - 0, [_amphora_mock], timeout_dict) + 0, [_amphora_mock], {}, timeout_dict) mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) @@ -796,20 +847,40 @@ class TestAmphoraDriverTasks(base.TestCase): Exception('boom')] mock_lb_get.return_value = _db_load_balancer_mock mock_amphora_repo_get.return_value = _db_amphora_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_update_obj = ( amphora_driver_tasks.AmphoraIndexVRRPUpdate()) amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0', + 0, [_amphora_mock], amphorae_status, + 'fakeint0', timeout_dict=self.timeout_dict) mock_driver.update_vrrp_conf.assert_called_once_with( _db_load_balancer_mock, amphorae_network_config, _db_amphora_mock, self.timeout_dict) + # Unreachable amp + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + mock_amphora_repo_update.reset_mock() + mock_driver.update_vrrp_conf.reset_mock() + amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, + 0, [_amphora_mock], amphorae_status, + None) + mock_driver.update_vrrp_conf.assert_not_called() + # Test with an exception mock_amphora_repo_update.reset_mock() amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, - 0, [_amphora_mock], 'fakeint0') + 0, [_amphora_mock], {}, 'fakeint0') mock_amphora_repo_update.assert_called_once_with( _session_mock, _db_amphora_mock.id, status=constants.ERROR) @@ -840,19 +911,36 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_get, mock_amphora_repo_update): mock_amphora_repo_get.return_value = _db_amphora_mock + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: False + } + } + amphora_vrrp_start_obj = ( amphora_driver_tasks.AmphoraIndexVRRPStart()) mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT, Exception('boom')] - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) + # Unreachable amp + mock_driver.start_vrrp_service.reset_mock() + amphorae_status = { + _amphora_mock[constants.ID]: { + constants.UNREACHABLE: True + } + } + amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, + timeout_dict=self.timeout_dict) + mock_driver.start_vrrp_service.assert_not_called() + # Test with a start exception mock_driver.start_vrrp_service.reset_mock() - amphora_vrrp_start_obj.execute(0, [_amphora_mock], + amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, timeout_dict=self.timeout_dict) mock_driver.start_vrrp_service.assert_called_once_with( _db_amphora_mock, self.timeout_dict) @@ -930,3 +1018,74 @@ class TestAmphoraDriverTasks(base.TestCase): self.assertRaises(driver_except.TimeOutException, amp_config_update_obj.execute, _amphora_mock, flavor) + + def test_amphorae_get_connectivity_status(self, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_get, + mock_amphora_repo_update): + amphora1_mock = mock.MagicMock() + amphora1_mock[constants.ID] = 'id1' + amphora2_mock = mock.MagicMock() + amphora2_mock[constants.ID] = 'id2' + db_amphora1_mock = mock.Mock() + db_amphora2_mock = mock.Mock() + + amp_get_connectivity_status = ( + amphora_driver_tasks.AmphoraeGetConnectivityStatus()) + + # All amphorae reachable + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.return_value = None + + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock[constants.ID]][constants.UNREACHABLE]) + self.assertFalse( + ret[amphora2_mock[constants.ID]][constants.UNREACHABLE]) + + # amphora1 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + driver_except.TimeOutException, None] + self.assertRaises(driver_except.TimeOutException, + amp_get_connectivity_status.execute, + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_called_with( + db_amphora1_mock, timeout_dict=self.timeout_dict) + + # amphora2 unreachable + mock_driver.check.reset_mock() + mock_amphora_repo_get.side_effect = [ + db_amphora1_mock, + db_amphora2_mock] + mock_driver.check.side_effect = [ + None, driver_except.TimeOutException] + ret = amp_get_connectivity_status.execute( + [amphora1_mock, amphora2_mock], + amphora1_mock[constants.ID], + timeout_dict=self.timeout_dict) + mock_driver.check.assert_has_calls( + [mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict), + mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)]) + self.assertFalse( + ret[amphora1_mock[constants.ID]][constants.UNREACHABLE]) + self.assertTrue( + ret[amphora2_mock[constants.ID]][constants.UNREACHABLE]) diff --git a/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml b/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml new file mode 100644 index 0000000000..21d5718d6e --- /dev/null +++ b/releasenotes/notes/reduce-duration-failover-636032433984d911.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Reduce the duration of the failovers of ACTIVE_STANDBY load balancers. Many + updates of an unreachable amphora may have been attempted during a + failover, now if an amphora is not reachable at the first update, the other + updates are skipped.