Reduce duration of failovers with amphora in ERROR

In the failover flow, there are multiple tasks for the configuration of
VRRP for the other amphorae of the load balancer, but during outage the
other amps may not be available. To prevent the tasks from attempting
connections to unreachable amphorae, we can detect in the first task
that an amp is unreachable and pass this information to the other tasks.

Those connection attempts could have taken a lot of time, between 15 min
and 40 min depending on the configuration of Octavia and the provider
driver (amphorav1 or amphorav2)

Closes-Bug: #2033894

Note: stable/2023.1 and older, the patch also includes modifications in
      octavia/controller/worker/v1/

Conflicts:
	octavia/controller/worker/v2/tasks/amphora_driver_tasks.py

Change-Id: Ib33a0b8d2875e4ff97c65933fe9360bb06994d32
(cherry picked from commit ca70587f0b)
(cherry picked from commit 6fdc2079da)
(cherry picked from commit 97b1b8387e)
This commit is contained in:
Gregory Thiemonge 2023-09-01 09:08:37 -04:00
parent 1dadaff530
commit dd849b4c5c
18 changed files with 745 additions and 102 deletions

View File

@ -14,6 +14,9 @@
# under the License. # under the License.
import abc import abc
from typing import Optional
from octavia.db import models as db_models
class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta): class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
@ -237,6 +240,19 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
:type timeout_dict: dict :type timeout_dict: dict
""" """
@abc.abstractmethod
def check(self, amphora: db_models.Amphora,
timeout_dict: Optional[dict] = None):
"""Check connectivity to the amphora.
:param amphora: The amphora to query.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:raises TimeOutException: The amphora didn't reply
"""
class VRRPDriverMixin(object, metaclass=abc.ABCMeta): class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
"""Abstract mixin class for VRRP support in loadbalancer amphorae """Abstract mixin class for VRRP support in loadbalancer amphorae

View File

@ -17,6 +17,7 @@ import hashlib
import os import os
import ssl import ssl
import time import time
from typing import Optional
import warnings import warnings
from oslo_context import context as oslo_context from oslo_context import context as oslo_context
@ -38,6 +39,7 @@ from octavia.common.jinja.lvs import jinja_cfg as jinja_udp_cfg
from octavia.common.tls_utils import cert_parser from octavia.common.tls_utils import cert_parser
from octavia.common import utils from octavia.common import utils
from octavia.db import api as db_apis from octavia.db import api as db_apis
from octavia.db import models as db_models
from octavia.db import repositories as repo from octavia.db import repositories as repo
from octavia.network import data_models as network_models from octavia.network import data_models as network_models
@ -115,6 +117,11 @@ class HaproxyAmphoraLoadBalancerDriver(
amphora.id, amphora.api_version) amphora.id, amphora.api_version)
return list(map(int, amphora.api_version.split('.'))) return list(map(int, amphora.api_version.split('.')))
def check(self, amphora: db_models.Amphora,
timeout_dict: Optional[dict] = None):
"""Check connectivity to the amphora."""
self._populate_amphora_api_version(amphora, timeout_dict)
def update_amphora_listeners(self, loadbalancer, amphora, def update_amphora_listeners(self, loadbalancer, amphora,
timeout_dict=None): timeout_dict=None):
"""Update the amphora with a new configuration. """Update the amphora with a new configuration.
@ -649,15 +656,15 @@ class HaproxyAmphoraLoadBalancerDriver(
req_read_timeout, conn_max_retries, req_read_timeout, conn_max_retries,
conn_retry_interval conn_retry_interval
:type timeout_dict: dict :type timeout_dict: dict
:returns: None if not found, the interface name string if found. :returns: the interface name string if found.
:raises octavia.amphorae.drivers.haproxy.exceptions.NotFound:
No interface found on the amphora
:raises TimeOutException: The amphora didn't reply
""" """
try: self._populate_amphora_api_version(amphora, timeout_dict)
self._populate_amphora_api_version(amphora, timeout_dict) response_json = self.clients[amphora.api_version].get_interface(
response_json = self.clients[amphora.api_version].get_interface( amphora, ip_address, timeout_dict, log_error=False)
amphora, ip_address, timeout_dict, log_error=False) return response_json.get('interface', None)
return response_json.get('interface', None)
except (exc.NotFound, driver_except.TimeOutException):
return None
# Check a custom hostname # Check a custom hostname

View File

@ -198,3 +198,6 @@ class NoopAmphoraLoadBalancerDriver(
def reload_vrrp_service(self, loadbalancer): def reload_vrrp_service(self, loadbalancer):
pass pass
def check(self, amphora, timeout_dict=None):
pass

View File

@ -312,6 +312,7 @@ AMPHORA_INDEX = 'amphora_index'
AMPHORA_NETWORK_CONFIG = 'amphora_network_config' AMPHORA_NETWORK_CONFIG = 'amphora_network_config'
AMPHORAE = 'amphorae' AMPHORAE = 'amphorae'
AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config' AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config'
AMPHORAE_STATUS = 'amphorae_status'
AMPS_DATA = 'amps_data' AMPS_DATA = 'amps_data'
ANTI_AFFINITY = 'anti-affinity' ANTI_AFFINITY = 'anti-affinity'
ATTEMPT_NUMBER = 'attempt_number' ATTEMPT_NUMBER = 'attempt_number'
@ -383,6 +384,7 @@ MESSAGE = 'message'
NAME = 'name' NAME = 'name'
NETWORK = 'network' NETWORK = 'network'
NETWORK_ID = 'network_id' NETWORK_ID = 'network_id'
NEW_AMPHORA_ID = 'new_amphora_id'
NEXTHOP = 'nexthop' NEXTHOP = 'nexthop'
NICS = 'nics' NICS = 'nics'
OBJECT = 'object' OBJECT = 'object'
@ -429,6 +431,7 @@ TLS_CERTIFICATE_ID = 'tls_certificate_id'
TLS_CONTAINER_ID = 'tls_container_id' TLS_CONTAINER_ID = 'tls_container_id'
TOPOLOGY = 'topology' TOPOLOGY = 'topology'
TOTAL_CONNECTIONS = 'total_connections' TOTAL_CONNECTIONS = 'total_connections'
UNREACHABLE = 'unreachable'
UPDATED_AT = 'updated_at' UPDATED_AT = 'updated_at'
UPDATE_DICT = 'update_dict' UPDATE_DICT = 'update_dict'
UPDATED_PORTS = 'updated_ports' UPDATED_PORTS = 'updated_ports'
@ -556,6 +559,7 @@ ADMIN_DOWN_PORT = 'admin-down-port'
AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug' AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug'
AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener' AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener'
AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert' AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert'
AMPHORAE_GET_CONNECTIVITY_STATUS = 'amphorae-get-connectivity-status'
AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug' AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug'
ATTACH_PORT = 'attach-port' ATTACH_PORT = 'attach-port'
CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta' CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta'

View File

@ -240,7 +240,8 @@ class AmphoraFlows(object):
return delete_amphora_flow return delete_amphora_flow
def get_vrrp_subflow(self, prefix, timeout_dict=None, def get_vrrp_subflow(self, prefix, timeout_dict=None,
create_vrrp_group=True): create_vrrp_group=True,
get_amphorae_status=True):
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
vrrp_subflow = linear_flow.Flow(sf_name) vrrp_subflow = linear_flow.Flow(sf_name)
@ -256,6 +257,17 @@ class AmphoraFlows(object):
requires=constants.LOADBALANCER_ID, requires=constants.LOADBALANCER_ID,
provides=constants.AMPHORAE_NETWORK_CONFIG)) provides=constants.AMPHORAE_NETWORK_CONFIG))
if get_amphorae_status:
# Get the amphorae_status dict in case the caller hasn't fetched
# it yet.
vrrp_subflow.add(
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
requires=constants.AMPHORAE,
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMPHORAE_STATUS))
# VRRP update needs to be run on all amphora to update # VRRP update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an # their peer configurations. So parallelize this with an
# unordered subflow. # unordered subflow.
@ -266,7 +278,7 @@ class AmphoraFlows(object):
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -275,13 +287,13 @@ class AmphoraFlows(object):
name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE, name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE,
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-0-' + constants.AMP_VRRP_START, name=sf_name + '-0-' + constants.AMP_VRRP_START,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -289,7 +301,7 @@ class AmphoraFlows(object):
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -298,12 +310,12 @@ class AmphoraFlows(object):
name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE, name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE,
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-1-' + constants.AMP_VRRP_START, name=sf_name + '-1-' + constants.AMP_VRRP_START,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -551,6 +563,14 @@ class AmphoraFlows(object):
constants.CONN_RETRY_INTERVAL: constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_retry_interval} CONF.haproxy_amphora.active_connection_retry_interval}
failover_amp_flow.add(
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
requires=constants.AMPHORAE,
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMPHORAE_STATUS))
# Listeners update needs to be run on all amphora to update # Listeners update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an # their peer configurations. So parallelize this with an
# unordered subflow. # unordered subflow.
@ -561,7 +581,8 @@ class AmphoraFlows(object):
update_amps_subflow.add( update_amps_subflow.add(
amphora_driver_tasks.AmphoraIndexListenerUpdate( amphora_driver_tasks.AmphoraIndexListenerUpdate(
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -571,7 +592,8 @@ class AmphoraFlows(object):
if lb_amp_count == 2: if lb_amp_count == 2:
failover_amp_flow.add( failover_amp_flow.add(
self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW, self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW,
timeout_dict, create_vrrp_group=False)) timeout_dict, create_vrrp_group=False,
get_amphorae_status=False))
# Reload the listener. This needs to be done here because # Reload the listener. This needs to be done here because
# it will create the required haproxy check scripts for # it will create the required haproxy check scripts for
@ -587,7 +609,8 @@ class AmphoraFlows(object):
amphora_driver_tasks.AmphoraIndexListenersReload( amphora_driver_tasks.AmphoraIndexListenersReload(
name=(str(amp_index) + '-' + name=(str(amp_index) + '-' +
constants.AMPHORA_RELOAD_LISTENER), constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -621,6 +621,14 @@ class LoadBalancerFlows(object):
requires=constants.LOADBALANCER_ID, requires=constants.LOADBALANCER_ID,
provides=constants.AMPHORAE)) provides=constants.AMPHORAE))
failover_LB_flow.add(
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
name=(new_amp_role + '-' +
constants.AMPHORAE_GET_CONNECTIVITY_STATUS),
requires=constants.AMPHORAE,
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
provides=constants.AMPHORAE_STATUS))
# Listeners update needs to be run on all amphora to update # Listeners update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an # their peer configurations. So parallelize this with an
# unordered subflow. # unordered subflow.
@ -635,14 +643,16 @@ class LoadBalancerFlows(object):
amphora_driver_tasks.AmphoraIndexListenerUpdate( amphora_driver_tasks.AmphoraIndexListenerUpdate(
name=(constants.AMPHORA + '-0-' + name=(constants.AMPHORA + '-0-' +
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
update_amps_subflow.add( update_amps_subflow.add(
amphora_driver_tasks.AmphoraIndexListenerUpdate( amphora_driver_tasks.AmphoraIndexListenerUpdate(
name=(constants.AMPHORA + '-1-' + name=(constants.AMPHORA + '-1-' +
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -651,7 +661,8 @@ class LoadBalancerFlows(object):
# Configure and enable keepalived in the amphora # Configure and enable keepalived in the amphora
failover_LB_flow.add(self.amp_flows.get_vrrp_subflow( failover_LB_flow.add(self.amp_flows.get_vrrp_subflow(
new_amp_role + '-' + constants.GET_VRRP_SUBFLOW, new_amp_role + '-' + constants.GET_VRRP_SUBFLOW,
timeout_dict, create_vrrp_group=False)) timeout_dict, create_vrrp_group=False,
get_amphorae_status=False))
# #### End of standby #### # #### End of standby ####

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# #
from typing import List
from typing import Optional
from cryptography import fernet from cryptography import fernet
from oslo_config import cfg from oslo_config import cfg
@ -73,10 +75,18 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora.""" """Task to update the listeners on one amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, def execute(self, loadbalancer, amphora_index, amphorae,
timeout_dict=None): amphorae_status: dict, timeout_dict=None):
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
# health manager fix it. # health manager fix it.
amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping listener update because amphora %s "
"is not reachable.", amphora_id)
return
try: try:
# Make sure we have a fresh load balancer object # Make sure we have a fresh load balancer object
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
@ -84,7 +94,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
self.amphora_driver.update_amphora_listeners( self.amphora_driver.update_amphora_listeners(
loadbalancer, amphorae[amphora_index], timeout_dict) loadbalancer, amphorae[amphora_index], timeout_dict)
except Exception as e: except Exception as e:
amphora_id = amphorae[amphora_index].id
LOG.error('Failed to update listeners on amphora %s. Skipping ' LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s', 'this amphora as it is failing to update due to: %s',
amphora_id, str(e)) amphora_id, str(e))
@ -129,14 +138,23 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
"""Task to reload all listeners on an amphora.""" """Task to reload all listeners on an amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, def execute(self, loadbalancer, amphora_index, amphorae,
timeout_dict=None): amphorae_status: dict, timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora.""" """Execute listener reload routines for listeners on an amphora."""
if amphorae is None:
return
amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping listener reload because amphora %s "
"is not reachable.", amphora_id)
return
if loadbalancer.listeners: if loadbalancer.listeners:
try: try:
self.amphora_driver.reload( self.amphora_driver.reload(
loadbalancer, amphorae[amphora_index], timeout_dict) loadbalancer, amphorae[amphora_index], timeout_dict)
except Exception as e: except Exception as e:
amphora_id = amphorae[amphora_index].id
LOG.warning('Failed to reload listeners on amphora %s. ' LOG.warning('Failed to reload listeners on amphora %s. '
'Skipping this amphora as it is failing to ' 'Skipping this amphora as it is failing to '
'reload due to: %s', amphora_id, str(e)) 'reload due to: %s', amphora_id, str(e))
@ -305,8 +323,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
"""Task to get and update the VRRP interface device name from amphora.""" """Task to get and update the VRRP interface device name from amphora."""
def execute(self, amphora_index, amphorae, timeout_dict=None): def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
amphora_id = amphorae[amphora_index].id amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping VRRP interface update because amphora %s "
"is not reachable.", amphora_id)
return None
try: try:
interface = self.amphora_driver.get_interface_from_ip( interface = self.amphora_driver.get_interface_from_ip(
amphorae[amphora_index], amphorae[amphora_index].vrrp_ip, amphorae[amphora_index], amphorae[amphora_index].vrrp_ip,
@ -354,14 +379,21 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
"""Task to update the VRRP configuration of an amphora.""" """Task to update the VRRP configuration of an amphora."""
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
amphorae, amp_vrrp_int, timeout_dict=None): amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
timeout_dict=None):
"""Execute update_vrrp_conf.""" """Execute update_vrrp_conf."""
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
id=loadbalancer_id)
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
# health manager fix it. # health manager fix it.
amphora_id = amphorae[amphora_index].id amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping VRRP configuration because amphora %s "
"is not reachable.", amphora_id)
return
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
id=loadbalancer_id)
amphorae[amphora_index].vrrp_interface = amp_vrrp_int amphorae[amphora_index].vrrp_interface = amp_vrrp_int
try: try:
self.amphora_driver.update_vrrp_conf( self.amphora_driver.update_vrrp_conf(
@ -394,8 +426,15 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
This will reload keepalived if it is already running. This will reload keepalived if it is already running.
""" """
def execute(self, amphora_index, amphorae, timeout_dict=None): def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
amphora_id = amphorae[amphora_index].id amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping VRRP start because amphora %s "
"is not reachable.", amphora_id)
return
try: try:
self.amphora_driver.start_vrrp_service(amphorae[amphora_index], self.amphora_driver.start_vrrp_service(amphorae[amphora_index],
timeout_dict) timeout_dict)
@ -451,3 +490,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask):
LOG.error('Amphora %s does not support agent configuration ' LOG.error('Amphora %s does not support agent configuration '
'update. Please update the amphora image for this ' 'update. Please update the amphora image for this '
'amphora. Skipping.', amphora.id) 'amphora. Skipping.', amphora.id)
class AmphoraeGetConnectivityStatus(BaseAmphoraTask):
"""Task that checks amphorae connectivity status.
Check and return the connectivity status of both amphorae in ACTIVE STANDBY
load balancers
"""
def execute(self, amphorae: List[dict], new_amphora_id: str,
timeout_dict=None):
amphorae_status = {}
for amphora in amphorae:
amphora_id = amphora.id
amphorae_status[amphora_id] = {}
session = db_apis.get_session()
with session.begin():
db_amp = self.amphora_repo.get(session, id=amphora_id)
try:
# Verify if the amphora is reachable
self.amphora_driver.check(db_amp, timeout_dict=timeout_dict)
except Exception as e:
LOG.exception("Cannot get status for amphora %s",
amphora_id)
# In case it fails and the tested amphora is the newly created
# amphora, it's not a normal error handling, re-raise the
# exception
if amphora_id == new_amphora_id:
raise e
amphorae_status[amphora_id][constants.UNREACHABLE] = True
else:
amphorae_status[amphora_id][constants.UNREACHABLE] = False
return amphorae_status

View File

@ -226,7 +226,8 @@ class AmphoraFlows(object):
return delete_amphora_flow return delete_amphora_flow
def get_vrrp_subflow(self, prefix, timeout_dict=None, def get_vrrp_subflow(self, prefix, timeout_dict=None,
create_vrrp_group=True): create_vrrp_group=True,
get_amphorae_status=True):
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
vrrp_subflow = linear_flow.Flow(sf_name) vrrp_subflow = linear_flow.Flow(sf_name)
@ -242,6 +243,17 @@ class AmphoraFlows(object):
requires=constants.LOADBALANCER_ID, requires=constants.LOADBALANCER_ID,
provides=constants.AMPHORAE_NETWORK_CONFIG)) provides=constants.AMPHORAE_NETWORK_CONFIG))
if get_amphorae_status:
# Get the amphorae_status dict in case the caller hasn't fetched
# it yet.
vrrp_subflow.add(
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
requires=constants.AMPHORAE,
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMPHORAE_STATUS))
# VRRP update needs to be run on all amphora to update # VRRP update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an # their peer configurations. So parallelize this with an
# unordered subflow. # unordered subflow.
@ -252,7 +264,7 @@ class AmphoraFlows(object):
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -261,13 +273,13 @@ class AmphoraFlows(object):
name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE, name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE,
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-0-' + constants.AMP_VRRP_START, name=sf_name + '-0-' + constants.AMP_VRRP_START,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -275,7 +287,7 @@ class AmphoraFlows(object):
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -284,12 +296,12 @@ class AmphoraFlows(object):
name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE, name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE,
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-1-' + constants.AMP_VRRP_START, name=sf_name + '-1-' + constants.AMP_VRRP_START,
requires=constants.AMPHORAE, requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -538,6 +550,14 @@ class AmphoraFlows(object):
constants.CONN_RETRY_INTERVAL: constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_retry_interval} CONF.haproxy_amphora.active_connection_retry_interval}
failover_amp_flow.add(
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
requires=constants.AMPHORAE,
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMPHORAE_STATUS))
# Listeners update needs to be run on all amphora to update # Listeners update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an # their peer configurations. So parallelize this with an
# unordered subflow. # unordered subflow.
@ -548,7 +568,8 @@ class AmphoraFlows(object):
update_amps_subflow.add( update_amps_subflow.add(
amphora_driver_tasks.AmphoraIndexListenerUpdate( amphora_driver_tasks.AmphoraIndexListenerUpdate(
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -558,7 +579,8 @@ class AmphoraFlows(object):
if lb_amp_count == 2: if lb_amp_count == 2:
failover_amp_flow.add( failover_amp_flow.add(
self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW, self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW,
timeout_dict, create_vrrp_group=False)) timeout_dict, create_vrrp_group=False,
get_amphorae_status=False))
# Reload the listener. This needs to be done here because # Reload the listener. This needs to be done here because
# it will create the required haproxy check scripts for # it will create the required haproxy check scripts for
@ -574,7 +596,8 @@ class AmphoraFlows(object):
amphora_driver_tasks.AmphoraIndexListenersReload( amphora_driver_tasks.AmphoraIndexListenersReload(
name=(str(amp_index) + '-' + name=(str(amp_index) + '-' +
constants.AMPHORA_RELOAD_LISTENER), constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -637,6 +637,14 @@ class LoadBalancerFlows(object):
requires=constants.LOADBALANCER_ID, requires=constants.LOADBALANCER_ID,
provides=constants.AMPHORAE)) provides=constants.AMPHORAE))
failover_LB_flow.add(
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
name=(new_amp_role + '-' +
constants.AMPHORAE_GET_CONNECTIVITY_STATUS),
requires=constants.AMPHORAE,
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
provides=constants.AMPHORAE_STATUS))
# Listeners update needs to be run on all amphora to update # Listeners update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an # their peer configurations. So parallelize this with an
# unordered subflow. # unordered subflow.
@ -651,14 +659,16 @@ class LoadBalancerFlows(object):
amphora_driver_tasks.AmphoraIndexListenerUpdate( amphora_driver_tasks.AmphoraIndexListenerUpdate(
name=(constants.AMPHORA + '-0-' + name=(constants.AMPHORA + '-0-' +
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
update_amps_subflow.add( update_amps_subflow.add(
amphora_driver_tasks.AmphoraIndexListenerUpdate( amphora_driver_tasks.AmphoraIndexListenerUpdate(
name=(constants.AMPHORA + '-1-' + name=(constants.AMPHORA + '-1-' +
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS),
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -667,7 +677,8 @@ class LoadBalancerFlows(object):
# Configure and enable keepalived in the amphora # Configure and enable keepalived in the amphora
failover_LB_flow.add(self.amp_flows.get_vrrp_subflow( failover_LB_flow.add(self.amp_flows.get_vrrp_subflow(
new_amp_role + '-' + constants.GET_VRRP_SUBFLOW, new_amp_role + '-' + constants.GET_VRRP_SUBFLOW,
timeout_dict, create_vrrp_group=False)) timeout_dict, create_vrrp_group=False,
get_amphorae_status=False))
# #### End of standby #### # #### End of standby ####

View File

@ -14,6 +14,9 @@
# #
import copy import copy
from typing import List
from typing import Optional
from cryptography import fernet from cryptography import fernet
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -99,10 +102,19 @@ class AmpListenersUpdate(BaseAmphoraTask):
class AmphoraIndexListenerUpdate(BaseAmphoraTask): class AmphoraIndexListenerUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora.""" """Task to update the listeners on one amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, timeout_dict=()): def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=()):
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
# health manager fix it. # health manager fix it.
amphora_id = amphorae[amphora_index].get(constants.ID)
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping listener update because amphora %s "
"is not reachable.", amphora_id)
return
try: try:
# TODO(johnsom) Optimize this to use the dicts and not need the # TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups # DB lookups
@ -115,7 +127,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
self.amphora_driver.update_amphora_listeners( self.amphora_driver.update_amphora_listeners(
db_lb, db_amp, timeout_dict) db_lb, db_amp, timeout_dict)
except Exception as e: except Exception as e:
amphora_id = amphorae[amphora_index].get(constants.ID)
LOG.error('Failed to update listeners on amphora %s. Skipping ' LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s', 'this amphora as it is failing to update due to: %s',
amphora_id, str(e)) amphora_id, str(e))
@ -177,10 +188,18 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
"""Task to reload all listeners on an amphora.""" """Task to reload all listeners on an amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, def execute(self, loadbalancer, amphora_index, amphorae,
timeout_dict=None): amphorae_status: dict, timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora.""" """Execute listener reload routines for listeners on an amphora."""
if amphorae is None: if amphorae is None:
return return
amphora_id = amphorae[amphora_index].get(constants.ID)
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping listener reload because amphora %s "
"is not reachable.", amphora_id)
return
# TODO(johnsom) Optimize this to use the dicts and not need the # TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups # DB lookups
db_amp = self.amphora_repo.get( db_amp = self.amphora_repo.get(
@ -192,7 +211,6 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
try: try:
self.amphora_driver.reload(db_lb, db_amp, timeout_dict) self.amphora_driver.reload(db_lb, db_amp, timeout_dict)
except Exception as e: except Exception as e:
amphora_id = amphorae[amphora_index][constants.ID]
LOG.warning('Failed to reload listeners on amphora %s. ' LOG.warning('Failed to reload listeners on amphora %s. '
'Skipping this amphora as it is failing to ' 'Skipping this amphora as it is failing to '
'reload due to: %s', amphora_id, str(e)) 'reload due to: %s', amphora_id, str(e))
@ -437,8 +455,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask): class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
"""Task to get and update the VRRP interface device name from amphora.""" """Task to get and update the VRRP interface device name from amphora."""
def execute(self, amphora_index, amphorae, timeout_dict=None): def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
amphora_id = amphorae[amphora_index][constants.ID] amphora_id = amphorae[amphora_index][constants.ID]
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping VRRP interface update because amphora %s "
"is not reachable.", amphora_id)
return None
try: try:
# TODO(johnsom) Optimize this to use the dicts and not need the # TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups # DB lookups
@ -494,12 +519,19 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
"""Task to update the VRRP configuration of an amphora.""" """Task to update the VRRP configuration of an amphora."""
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
amphorae, amp_vrrp_int, timeout_dict=None): amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
timeout_dict=None):
"""Execute update_vrrp_conf.""" """Execute update_vrrp_conf."""
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
# health manager fix it. # health manager fix it.
amphora_id = amphorae[amphora_index][constants.ID] amphora_id = amphorae[amphora_index][constants.ID]
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping VRRP configuration because amphora %s "
"is not reachable.", amphora_id)
return
try: try:
# TODO(johnsom) Optimize this to use the dicts and not need the # TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups # DB lookups
@ -541,10 +573,17 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
This will reload keepalived if it is already running. This will reload keepalived if it is already running.
""" """
def execute(self, amphora_index, amphorae, timeout_dict=None): def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None):
# TODO(johnsom) Optimize this to use the dicts and not need the # TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups # DB lookups
amphora_id = amphorae[amphora_index][constants.ID] amphora_id = amphorae[amphora_index][constants.ID]
amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE):
LOG.warning("Skipping VRRP start because amphora %s "
"is not reachable.", amphora_id)
return
db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora_id) db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora_id)
try: try:
self.amphora_driver.start_vrrp_service(db_amp, timeout_dict) self.amphora_driver.start_vrrp_service(db_amp, timeout_dict)
@ -608,3 +647,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask):
'update. Please update the amphora image for this ' 'update. Please update the amphora image for this '
'amphora. Skipping.'. 'amphora. Skipping.'.
format(amphora.get(constants.ID))) format(amphora.get(constants.ID)))
class AmphoraeGetConnectivityStatus(BaseAmphoraTask):
"""Task that checks amphorae connectivity status.
Check and return the connectivity status of both amphorae in ACTIVE STANDBY
load balancers
"""
def execute(self, amphorae: List[dict], new_amphora_id: str,
timeout_dict=None):
amphorae_status = {}
for amphora in amphorae:
amphora_id = amphora[constants.ID]
amphorae_status[amphora_id] = {}
session = db_apis.get_session()
with session.begin():
db_amp = self.amphora_repo.get(session, id=amphora_id)
try:
# Verify if the amphora is reachable
self.amphora_driver.check(db_amp, timeout_dict=timeout_dict)
except Exception as e:
LOG.exception("Cannot get status for amphora %s",
amphora_id)
# In case it fails and the tested amphora is the newly created
# amphora, it's not a normal error handling, re-raise the
# exception
if amphora_id == new_amphora_id:
raise e
amphorae_status[amphora_id][constants.UNREACHABLE] = True
else:
amphorae_status[amphora_id][constants.UNREACHABLE] = False
return amphorae_status

View File

@ -75,9 +75,9 @@ class TestHAProxyAmphoraDriver(base.TestCase):
mock_api_version.reset_mock() mock_api_version.reset_mock()
client_mock.reset_mock() client_mock.reset_mock()
result = self.driver.get_interface_from_ip(amphora_mock, IP_ADDRESS) self.assertRaises(
exc.NotFound,
self.assertIsNone(result) self.driver.get_interface_from_ip, amphora_mock, IP_ADDRESS)
mock_api_version.assert_called_once_with(amphora_mock, None) mock_api_version.assert_called_once_with(amphora_mock, None)
client_mock.get_interface.assert_called_once_with( client_mock.get_interface.assert_called_once_with(
amphora_mock, IP_ADDRESS, None, log_error=False) amphora_mock, IP_ADDRESS, None, log_error=False)

View File

@ -243,6 +243,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.AMPHORAE, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
@ -253,7 +254,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
self.assertEqual(7, len(amp_flow.requires)) self.assertEqual(7, len(amp_flow.requires))
self.assertEqual(13, len(amp_flow.provides)) self.assertEqual(14, len(amp_flow.provides))
def test_get_failover_flow_standalone(self, mock_get_net_driver): def test_get_failover_flow_standalone(self, mock_get_net_driver):
failed_amphora = data_models.Amphora( failed_amphora = data_models.Amphora(
@ -276,6 +277,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.AMPHORAE, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
@ -286,7 +288,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
self.assertEqual(7, len(amp_flow.requires)) self.assertEqual(7, len(amp_flow.requires))
self.assertEqual(12, len(amp_flow.provides)) self.assertEqual(13, len(amp_flow.provides))
def test_get_failover_flow_bogus_role(self, mock_get_net_driver): def test_get_failover_flow_bogus_role(self, mock_get_net_driver):
failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(), failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(),
@ -324,12 +326,30 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertEqual(3, len(vrrp_subflow.provides))
self.assertEqual(3, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver):
vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123',
get_amphorae_status=False)
self.assertIsInstance(vrrp_subflow, flow.Flow)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides)) self.assertEqual(2, len(vrrp_subflow.provides))
self.assertEqual(2, len(vrrp_subflow.requires)) self.assertEqual(3, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_create_vrrp_group( def test_get_vrrp_subflow_dont_create_vrrp_group(
self, mock_get_net_driver): self, mock_get_net_driver):
@ -340,12 +360,14 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides)) self.assertEqual(3, len(vrrp_subflow.provides))
self.assertEqual(2, len(vrrp_subflow.requires)) self.assertEqual(3, len(vrrp_subflow.requires))
def test_get_post_map_lb_subflow(self, mock_get_net_driver): def test_get_post_map_lb_subflow(self, mock_get_net_driver):

View File

@ -155,10 +155,16 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.UPDATE_DICT, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(4, len(amp_flow.provides)) self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires)) self.assertEqual(3, len(amp_flow.requires))
amp_flow = self.LBFlow.get_post_lb_amp_association_flow( amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
'123', constants.TOPOLOGY_ACTIVE_STANDBY) '123', constants.TOPOLOGY_ACTIVE_STANDBY)
@ -167,10 +173,16 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.UPDATE_DICT, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(4, len(amp_flow.provides)) self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires)) self.assertEqual(3, len(amp_flow.requires))
def test_get_create_load_balancer_flows_single_listeners( def test_get_create_load_balancer_flows_single_listeners(
self, mock_get_net_driver): self, mock_get_net_driver):
@ -219,6 +231,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.LISTENERS, create_flow.provides) self.assertIn(constants.LISTENERS, create_flow.provides)
self.assertIn(constants.AMPHORA, create_flow.provides) self.assertIn(constants.AMPHORA, create_flow.provides)
self.assertIn(constants.AMPHORA_ID, create_flow.provides) self.assertIn(constants.AMPHORA_ID, create_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides)
self.assertIn(constants.COMPUTE_ID, create_flow.provides) self.assertIn(constants.COMPUTE_ID, create_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides) self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
self.assertIn(constants.LOADBALANCER, create_flow.provides) self.assertIn(constants.LOADBALANCER, create_flow.provides)
@ -230,7 +243,7 @@ class TestLoadBalancerFlows(base.TestCase):
create_flow.provides) create_flow.provides)
self.assertEqual(6, len(create_flow.requires)) self.assertEqual(6, len(create_flow.requires))
self.assertEqual(16, len(create_flow.provides), self.assertEqual(17, len(create_flow.provides),
create_flow.provides) create_flow.provides)
def _test_get_failover_LB_flow_single(self, amphorae): def _test_get_failover_LB_flow_single(self, amphorae):
@ -322,6 +335,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.AMPHORA, failover_flow.provides) self.assertIn(constants.AMPHORA, failover_flow.provides)
self.assertIn(constants.AMPHORA_ID, failover_flow.provides) self.assertIn(constants.AMPHORA_ID, failover_flow.provides)
self.assertIn(constants.AMPHORAE, failover_flow.provides) self.assertIn(constants.AMPHORAE, failover_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, failover_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
failover_flow.provides) failover_flow.provides)
self.assertIn(constants.BASE_PORT, failover_flow.provides) self.assertIn(constants.BASE_PORT, failover_flow.provides)
@ -339,7 +353,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(6, len(failover_flow.requires), self.assertEqual(6, len(failover_flow.requires),
failover_flow.requires) failover_flow.requires)
self.assertEqual(16, len(failover_flow.provides), self.assertEqual(17, len(failover_flow.provides),
failover_flow.provides) failover_flow.provides)
def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver): def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver):

View File

@ -125,17 +125,37 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_update): mock_amphora_repo_update):
mock_lb_repo_get.return_value = _LB_mock mock_lb_repo_get.return_value = _LB_mock
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: False
}
}
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
amp_list_update_obj.execute(_load_balancer_mock, 0, amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], self.timeout_dict) [_amphora_mock], amphorae_status,
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with( mock_driver.update_amphora_listeners.assert_called_once_with(
_LB_mock, _amphora_mock, self.timeout_dict) _LB_mock, _amphora_mock, self.timeout_dict)
# Unreachable amp
mock_driver.reset_mock()
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: True
}
}
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict)
mock_driver.update_amphora_listeners.assert_not_called()
# Test exception
mock_driver.update_amphora_listeners.side_effect = Exception('boom') mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_load_balancer_mock, 0, amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], self.timeout_dict) [_amphora_mock], {},
self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR) _session_mock, AMP_ID, status=constants.ERROR)
@ -217,20 +237,38 @@ class TestAmphoraDriverTasks(base.TestCase):
# Test no listeners # Test no listeners
mock_lb.listeners = None mock_lb.listeners = None
listeners_reload_obj.execute(mock_lb, 0, None) listeners_reload_obj.execute(mock_lb, 0, None, {})
mock_driver.reload.assert_not_called() mock_driver.reload.assert_not_called()
# Test with listeners # Test with listeners
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: False
}
}
mock_driver.start.reset_mock() mock_driver.start.reset_mock()
mock_lb.listeners = [mock_listener] mock_lb.listeners = [mock_listener]
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
amphorae_status,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict) self.timeout_dict)
# Unreachable amp
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: True
}
}
mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status,
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_not_called()
# Test with reload exception # Test with reload exception
mock_driver.reload.reset_mock() mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {},
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict) self.timeout_dict)
@ -604,6 +642,11 @@ class TestAmphoraDriverTasks(base.TestCase):
_LB_mock.amphorae = _amphorae_mock _LB_mock.amphorae = _amphorae_mock
mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE, mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE,
Exception('boom')] Exception('boom')]
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: False
}
}
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
@ -611,16 +654,27 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_update_vrrp_interface_obj = ( amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], timeout_dict) 0, [_amphora_mock], amphorae_status, timeout_dict)
mock_driver.get_interface_from_ip.assert_called_once_with( mock_driver.get_interface_from_ip.assert_called_once_with(
_amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict) _amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, vrrp_interface=FAKE_INTERFACE) _session_mock, _amphora_mock.id, vrrp_interface=FAKE_INTERFACE)
# Unreachable amp
mock_driver.reset_mock()
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: True
}
}
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict)
mock_driver.get_interface_from_ip.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], timeout_dict) 0, [_amphora_mock], {}, timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR) _session_mock, _amphora_mock.id, status=constants.ERROR)
@ -666,20 +720,41 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.update_vrrp_conf.side_effect = [mock.DEFAULT, mock_driver.update_vrrp_conf.side_effect = [mock.DEFAULT,
Exception('boom')] Exception('boom')]
mock_lb_get.return_value = _LB_mock mock_lb_get.return_value = _LB_mock
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: False
}
}
amphora_vrrp_update_obj = ( amphora_vrrp_update_obj = (
amphora_driver_tasks.AmphoraIndexVRRPUpdate()) amphora_driver_tasks.AmphoraIndexVRRPUpdate())
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
0, [_amphora_mock], 'fakeint0', 0, [_amphora_mock], amphorae_status,
'fakeint0',
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.update_vrrp_conf.assert_called_once_with( mock_driver.update_vrrp_conf.assert_called_once_with(
_LB_mock, amphorae_network_config, _amphora_mock, _LB_mock, amphorae_network_config, _amphora_mock,
self.timeout_dict) self.timeout_dict)
# Unreachable amp
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: True
}
}
mock_amphora_repo_update.reset_mock()
mock_driver.update_vrrp_conf.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status,
None)
mock_driver.update_vrrp_conf.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
0, [_amphora_mock], 'fakeint0') 0, [_amphora_mock], {},
'fakeint0')
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR) _session_mock, _amphora_mock.id, status=constants.ERROR)
@ -706,19 +781,36 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_get, mock_listener_repo_get,
mock_listener_repo_update, mock_listener_repo_update,
mock_amphora_repo_update): mock_amphora_repo_update):
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: False
}
}
amphora_vrrp_start_obj = ( amphora_vrrp_start_obj = (
amphora_driver_tasks.AmphoraIndexVRRPStart()) amphora_driver_tasks.AmphoraIndexVRRPStart())
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT, mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
Exception('boom')] Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict) _amphora_mock, self.timeout_dict)
# Unreachable amp
mock_driver.start_vrrp_service.reset_mock()
amphorae_status = {
_amphora_mock.id: {
constants.UNREACHABLE: True
}
}
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_not_called()
# Test with a start exception # Test with a start exception
mock_driver.start_vrrp_service.reset_mock() mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict) _amphora_mock, self.timeout_dict)
@ -790,3 +882,75 @@ class TestAmphoraDriverTasks(base.TestCase):
self.assertRaises(driver_except.TimeOutException, self.assertRaises(driver_except.TimeOutException,
amp_config_update_obj.execute, amp_config_update_obj.execute,
_amphora_mock, flavor) _amphora_mock, flavor)
@mock.patch('octavia.db.repositories.AmphoraRepository.get')
def test_amphorae_get_connectivity_status(self,
mock_amphora_repo_get,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
amphora1_mock = mock.MagicMock()
amphora1_mock.id = 'id1'
amphora2_mock = mock.MagicMock()
amphora2_mock.id = 'id2'
db_amphora1_mock = mock.Mock()
db_amphora2_mock = mock.Mock()
amp_get_connectivity_status = (
amphora_driver_tasks.AmphoraeGetConnectivityStatus())
# All amphorae reachable
mock_amphora_repo_get.side_effect = [
db_amphora1_mock,
db_amphora2_mock]
mock_driver.check.return_value = None
ret = amp_get_connectivity_status.execute(
[amphora1_mock, amphora2_mock],
amphora1_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.check.assert_has_calls(
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
self.assertFalse(
ret[amphora1_mock.id][constants.UNREACHABLE])
self.assertFalse(
ret[amphora2_mock.id][constants.UNREACHABLE])
# amphora1 unreachable
mock_driver.check.reset_mock()
mock_amphora_repo_get.side_effect = [
db_amphora1_mock,
db_amphora2_mock]
mock_driver.check.side_effect = [
driver_except.TimeOutException, None]
self.assertRaises(driver_except.TimeOutException,
amp_get_connectivity_status.execute,
[amphora1_mock, amphora2_mock],
amphora1_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.check.assert_called_with(
db_amphora1_mock, timeout_dict=self.timeout_dict)
# amphora2 unreachable
mock_driver.check.reset_mock()
mock_amphora_repo_get.side_effect = [
db_amphora1_mock,
db_amphora2_mock]
mock_driver.check.side_effect = [
None, driver_except.TimeOutException]
ret = amp_get_connectivity_status.execute(
[amphora1_mock, amphora2_mock],
amphora1_mock.id,
timeout_dict=self.timeout_dict)
mock_driver.check.assert_has_calls(
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
self.assertFalse(
ret[amphora1_mock.id][constants.UNREACHABLE])
self.assertTrue(
ret[amphora2_mock.id][constants.UNREACHABLE])

View File

@ -286,6 +286,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.AMPHORAE, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
@ -296,7 +297,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
self.assertEqual(8, len(amp_flow.requires)) self.assertEqual(8, len(amp_flow.requires))
self.assertEqual(13, len(amp_flow.provides)) self.assertEqual(14, len(amp_flow.provides))
def test_get_failover_flow_standalone(self, mock_get_net_driver): def test_get_failover_flow_standalone(self, mock_get_net_driver):
failed_amphora = data_models.Amphora( failed_amphora = data_models.Amphora(
@ -320,6 +321,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides) self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.AMPHORAE, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.BASE_PORT, amp_flow.provides) self.assertIn(constants.BASE_PORT, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides) self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
@ -330,7 +332,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
self.assertEqual(8, len(amp_flow.requires)) self.assertEqual(8, len(amp_flow.requires))
self.assertEqual(12, len(amp_flow.provides)) self.assertEqual(13, len(amp_flow.provides))
def test_get_failover_flow_bogus_role(self, mock_get_net_driver): def test_get_failover_flow_bogus_role(self, mock_get_net_driver):
failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(), failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(),
@ -368,12 +370,30 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertEqual(3, len(vrrp_subflow.provides))
self.assertEqual(3, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver):
vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123',
get_amphorae_status=False)
self.assertIsInstance(vrrp_subflow, flow.Flow)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides)) self.assertEqual(2, len(vrrp_subflow.provides))
self.assertEqual(2, len(vrrp_subflow.requires)) self.assertEqual(3, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_create_vrrp_group( def test_get_vrrp_subflow_dont_create_vrrp_group(
self, mock_get_net_driver): self, mock_get_net_driver):
@ -384,12 +404,14 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides) self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides)) self.assertEqual(3, len(vrrp_subflow.provides))
self.assertEqual(2, len(vrrp_subflow.requires)) self.assertEqual(3, len(vrrp_subflow.requires))
def test_update_amphora_config_flow(self, mock_get_net_driver): def test_update_amphora_config_flow(self, mock_get_net_driver):

View File

@ -199,14 +199,16 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.UPDATE_DICT, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORAE, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) self.assertEqual(3, len(amp_flow.requires), amp_flow.requires)
self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) self.assertEqual(5, len(amp_flow.provides), amp_flow.provides)
amp_flow = self.LBFlow.get_post_lb_amp_association_flow( amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
'123', constants.TOPOLOGY_ACTIVE_STANDBY) '123', constants.TOPOLOGY_ACTIVE_STANDBY)
@ -215,14 +217,16 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.UPDATE_DICT, amp_flow.requires) self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORAE, amp_flow.provides) self.assertIn(constants.AMPHORAE, amp_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides) self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires), amp_flow.requires) self.assertEqual(3, len(amp_flow.requires), amp_flow.requires)
self.assertEqual(4, len(amp_flow.provides), amp_flow.provides) self.assertEqual(5, len(amp_flow.provides), amp_flow.provides)
@mock.patch('octavia.common.rpc.NOTIFIER', @mock.patch('octavia.common.rpc.NOTIFIER',
new_callable=MockNOTIFIER) new_callable=MockNOTIFIER)
@ -285,6 +289,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.AMPHORA_ID, create_flow.provides) self.assertIn(constants.AMPHORA_ID, create_flow.provides)
self.assertIn(constants.AMPHORA_NETWORK_CONFIG, create_flow.provides) self.assertIn(constants.AMPHORA_NETWORK_CONFIG, create_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides) self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides)
self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides)
self.assertIn(constants.COMPUTE_ID, create_flow.provides) self.assertIn(constants.COMPUTE_ID, create_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides) self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
self.assertIn(constants.DELTAS, create_flow.provides) self.assertIn(constants.DELTAS, create_flow.provides)
@ -296,7 +301,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.ADDITIONAL_VIPS, create_flow.provides) self.assertIn(constants.ADDITIONAL_VIPS, create_flow.provides)
self.assertEqual(6, len(create_flow.requires), create_flow.requires) self.assertEqual(6, len(create_flow.requires), create_flow.requires)
self.assertEqual(17, len(create_flow.provides), self.assertEqual(18, len(create_flow.provides),
create_flow.provides) create_flow.provides)
def _test_get_failover_LB_flow_single(self, amphorae): def _test_get_failover_LB_flow_single(self, amphorae):

View File

@ -132,17 +132,35 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_get.return_value = _db_amphora_mock mock_amphora_repo_get.return_value = _db_amphora_mock
mock_lb_get.return_value = _db_load_balancer_mock mock_lb_get.return_value = _db_load_balancer_mock
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: False
}
}
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
self.timeout_dict) amphorae_status, self.timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with( mock_driver.update_amphora_listeners.assert_called_once_with(
_db_load_balancer_mock, _db_amphora_mock, self.timeout_dict) _db_load_balancer_mock, _db_amphora_mock, self.timeout_dict)
# Unreachable amp
mock_driver.reset_mock()
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: True
}
}
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict)
mock_driver.update_amphora_listeners.assert_not_called()
# Test exception
mock_driver.update_amphora_listeners.side_effect = Exception('boom') mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_LB_mock, 0, amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {},
[_amphora_mock], self.timeout_dict) self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR) _session_mock, AMP_ID, status=constants.ERROR)
@ -194,37 +212,54 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver, mock_generate_uuid, mock_log, mock_get_session, mock_driver, mock_generate_uuid, mock_log, mock_get_session,
mock_listener_repo_get, mock_listener_repo_update, mock_listener_repo_get, mock_listener_repo_update,
mock_amphora_repo_get, mock_amphora_repo_update): mock_amphora_repo_get, mock_amphora_repo_update):
amphora_mock = mock.MagicMock()
listeners_reload_obj = ( listeners_reload_obj = (
amphora_driver_tasks.AmphoraIndexListenersReload()) amphora_driver_tasks.AmphoraIndexListenersReload())
mock_lb = mock.MagicMock() mock_lb = mock.MagicMock()
mock_listener = mock.MagicMock() mock_listener = mock.MagicMock()
mock_listener.id = '12345' mock_listener.id = '12345'
mock_amphora_repo_get.return_value = amphora_mock mock_amphora_repo_get.return_value = _amphora_mock
mock_lb_repo_get.return_value = mock_lb mock_lb_repo_get.return_value = mock_lb
mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')] mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')]
# Test no listeners # Test no listeners
mock_lb.listeners = None mock_lb.listeners = None
listeners_reload_obj.execute(mock_lb, 0, None) listeners_reload_obj.execute(mock_lb, 0, None, {})
mock_driver.reload.assert_not_called() mock_driver.reload.assert_not_called()
# Test with listeners # Test with listeners
mock_driver.start.reset_mock() amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: False
}
}
mock_driver.reload.reset_mock()
mock_lb.listeners = [mock_listener] mock_lb.listeners = [mock_listener]
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict) self.timeout_dict)
# Unreachable amp
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: True
}
}
mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status,
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_not_called()
# Test with reload exception # Test with reload exception
mock_driver.reload.reset_mock() mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict) self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, amphora_mock[constants.ID], _session_mock, _amphora_mock[constants.ID],
status=constants.ERROR) status=constants.ERROR)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
@ -804,6 +839,11 @@ class TestAmphoraDriverTasks(base.TestCase):
FAKE_INTERFACE = 'fake0' FAKE_INTERFACE = 'fake0'
mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE, mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE,
Exception('boom')] Exception('boom')]
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: False
}
}
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
@ -811,17 +851,28 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_update_vrrp_interface_obj = ( amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], timeout_dict) 0, [_amphora_mock], amphorae_status, timeout_dict)
mock_driver.get_interface_from_ip.assert_called_once_with( mock_driver.get_interface_from_ip.assert_called_once_with(
_db_amphora_mock, _db_amphora_mock.vrrp_ip, _db_amphora_mock, _db_amphora_mock.vrrp_ip,
timeout_dict=timeout_dict) timeout_dict=timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, vrrp_interface=FAKE_INTERFACE) _session_mock, _db_amphora_mock.id, vrrp_interface=FAKE_INTERFACE)
# Unreachable amp
mock_driver.reset_mock()
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: True
}
}
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict)
mock_driver.get_interface_from_ip.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], timeout_dict) 0, [_amphora_mock], {}, timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR) _session_mock, _db_amphora_mock.id, status=constants.ERROR)
@ -872,20 +923,40 @@ class TestAmphoraDriverTasks(base.TestCase):
Exception('boom')] Exception('boom')]
mock_lb_get.return_value = _db_load_balancer_mock mock_lb_get.return_value = _db_load_balancer_mock
mock_amphora_repo_get.return_value = _db_amphora_mock mock_amphora_repo_get.return_value = _db_amphora_mock
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: False
}
}
amphora_vrrp_update_obj = ( amphora_vrrp_update_obj = (
amphora_driver_tasks.AmphoraIndexVRRPUpdate()) amphora_driver_tasks.AmphoraIndexVRRPUpdate())
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], 'fakeint0', 0, [_amphora_mock], amphorae_status,
'fakeint0',
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.update_vrrp_conf.assert_called_once_with( mock_driver.update_vrrp_conf.assert_called_once_with(
_db_load_balancer_mock, amphorae_network_config, _db_amphora_mock, _db_load_balancer_mock, amphorae_network_config, _db_amphora_mock,
self.timeout_dict) self.timeout_dict)
# Unreachable amp
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: True
}
}
mock_amphora_repo_update.reset_mock()
mock_driver.update_vrrp_conf.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status,
None)
mock_driver.update_vrrp_conf.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], 'fakeint0') 0, [_amphora_mock], {}, 'fakeint0')
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR) _session_mock, _db_amphora_mock.id, status=constants.ERROR)
@ -916,19 +987,36 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_get, mock_amphora_repo_get,
mock_amphora_repo_update): mock_amphora_repo_update):
mock_amphora_repo_get.return_value = _db_amphora_mock mock_amphora_repo_get.return_value = _db_amphora_mock
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: False
}
}
amphora_vrrp_start_obj = ( amphora_vrrp_start_obj = (
amphora_driver_tasks.AmphoraIndexVRRPStart()) amphora_driver_tasks.AmphoraIndexVRRPStart())
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT, mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
Exception('boom')] Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict) _db_amphora_mock, self.timeout_dict)
# Unreachable amp
mock_driver.start_vrrp_service.reset_mock()
amphorae_status = {
_amphora_mock[constants.ID]: {
constants.UNREACHABLE: True
}
}
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_not_called()
# Test with a start exception # Test with a start exception
mock_driver.start_vrrp_service.reset_mock() mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict) _db_amphora_mock, self.timeout_dict)
@ -1006,3 +1094,74 @@ class TestAmphoraDriverTasks(base.TestCase):
self.assertRaises(driver_except.TimeOutException, self.assertRaises(driver_except.TimeOutException,
amp_config_update_obj.execute, amp_config_update_obj.execute,
_amphora_mock, flavor) _amphora_mock, flavor)
def test_amphorae_get_connectivity_status(self,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_get,
mock_amphora_repo_update):
amphora1_mock = mock.MagicMock()
amphora1_mock[constants.ID] = 'id1'
amphora2_mock = mock.MagicMock()
amphora2_mock[constants.ID] = 'id2'
db_amphora1_mock = mock.Mock()
db_amphora2_mock = mock.Mock()
amp_get_connectivity_status = (
amphora_driver_tasks.AmphoraeGetConnectivityStatus())
# All amphorae reachable
mock_amphora_repo_get.side_effect = [
db_amphora1_mock,
db_amphora2_mock]
mock_driver.check.return_value = None
ret = amp_get_connectivity_status.execute(
[amphora1_mock, amphora2_mock],
amphora1_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.check.assert_has_calls(
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
self.assertFalse(
ret[amphora1_mock[constants.ID]][constants.UNREACHABLE])
self.assertFalse(
ret[amphora2_mock[constants.ID]][constants.UNREACHABLE])
# amphora1 unreachable
mock_driver.check.reset_mock()
mock_amphora_repo_get.side_effect = [
db_amphora1_mock,
db_amphora2_mock]
mock_driver.check.side_effect = [
driver_except.TimeOutException, None]
self.assertRaises(driver_except.TimeOutException,
amp_get_connectivity_status.execute,
[amphora1_mock, amphora2_mock],
amphora1_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.check.assert_called_with(
db_amphora1_mock, timeout_dict=self.timeout_dict)
# amphora2 unreachable
mock_driver.check.reset_mock()
mock_amphora_repo_get.side_effect = [
db_amphora1_mock,
db_amphora2_mock]
mock_driver.check.side_effect = [
None, driver_except.TimeOutException]
ret = amp_get_connectivity_status.execute(
[amphora1_mock, amphora2_mock],
amphora1_mock[constants.ID],
timeout_dict=self.timeout_dict)
mock_driver.check.assert_has_calls(
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
self.assertFalse(
ret[amphora1_mock[constants.ID]][constants.UNREACHABLE])
self.assertTrue(
ret[amphora2_mock[constants.ID]][constants.UNREACHABLE])

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Reduce the duration of the failovers of ACTIVE_STANDBY load balancers. Many
updates of an unreachable amphora may have been attempted during a
failover, now if an amphora is not reachable at the first update, the other
updates are skipped.