Merge "Reduce duration of failovers with amphora in ERROR" into stable/xena
This commit is contained in:
commit
3b3698782f
@ -14,6 +14,9 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from octavia.db import models as db_models
|
||||||
|
|
||||||
|
|
||||||
class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
|
class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
|
||||||
@ -231,6 +234,19 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
|
|||||||
:type timeout_dict: dict
|
:type timeout_dict: dict
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def check(self, amphora: db_models.Amphora,
|
||||||
|
timeout_dict: Optional[dict] = None):
|
||||||
|
"""Check connectivity to the amphora.
|
||||||
|
|
||||||
|
:param amphora: The amphora to query.
|
||||||
|
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||||
|
amphora. May contain: req_conn_timeout,
|
||||||
|
req_read_timeout, conn_max_retries,
|
||||||
|
conn_retry_interval
|
||||||
|
:raises TimeOutException: The amphora didn't reply
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
|
class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
|
||||||
"""Abstract mixin class for VRRP support in loadbalancer amphorae
|
"""Abstract mixin class for VRRP support in loadbalancer amphorae
|
||||||
|
@ -17,6 +17,7 @@ import hashlib
|
|||||||
import os
|
import os
|
||||||
import ssl
|
import ssl
|
||||||
import time
|
import time
|
||||||
|
from typing import Optional
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
from oslo_context import context as oslo_context
|
from oslo_context import context as oslo_context
|
||||||
@ -38,6 +39,7 @@ from octavia.common.jinja.lvs import jinja_cfg as jinja_udp_cfg
|
|||||||
from octavia.common.tls_utils import cert_parser
|
from octavia.common.tls_utils import cert_parser
|
||||||
from octavia.common import utils
|
from octavia.common import utils
|
||||||
from octavia.db import api as db_apis
|
from octavia.db import api as db_apis
|
||||||
|
from octavia.db import models as db_models
|
||||||
from octavia.db import repositories as repo
|
from octavia.db import repositories as repo
|
||||||
from octavia.network import data_models as network_models
|
from octavia.network import data_models as network_models
|
||||||
|
|
||||||
@ -115,6 +117,11 @@ class HaproxyAmphoraLoadBalancerDriver(
|
|||||||
amphora.id, amphora.api_version)
|
amphora.id, amphora.api_version)
|
||||||
return list(map(int, amphora.api_version.split('.')))
|
return list(map(int, amphora.api_version.split('.')))
|
||||||
|
|
||||||
|
def check(self, amphora: db_models.Amphora,
|
||||||
|
timeout_dict: Optional[dict] = None):
|
||||||
|
"""Check connectivity to the amphora."""
|
||||||
|
self._populate_amphora_api_version(amphora, timeout_dict)
|
||||||
|
|
||||||
def update_amphora_listeners(self, loadbalancer, amphora,
|
def update_amphora_listeners(self, loadbalancer, amphora,
|
||||||
timeout_dict=None):
|
timeout_dict=None):
|
||||||
"""Update the amphora with a new configuration.
|
"""Update the amphora with a new configuration.
|
||||||
@ -635,15 +642,15 @@ class HaproxyAmphoraLoadBalancerDriver(
|
|||||||
req_read_timeout, conn_max_retries,
|
req_read_timeout, conn_max_retries,
|
||||||
conn_retry_interval
|
conn_retry_interval
|
||||||
:type timeout_dict: dict
|
:type timeout_dict: dict
|
||||||
:returns: None if not found, the interface name string if found.
|
:returns: the interface name string if found.
|
||||||
|
:raises octavia.amphorae.drivers.haproxy.exceptions.NotFound:
|
||||||
|
No interface found on the amphora
|
||||||
|
:raises TimeOutException: The amphora didn't reply
|
||||||
"""
|
"""
|
||||||
try:
|
self._populate_amphora_api_version(amphora, timeout_dict)
|
||||||
self._populate_amphora_api_version(amphora, timeout_dict)
|
response_json = self.clients[amphora.api_version].get_interface(
|
||||||
response_json = self.clients[amphora.api_version].get_interface(
|
amphora, ip_address, timeout_dict, log_error=False)
|
||||||
amphora, ip_address, timeout_dict, log_error=False)
|
return response_json.get('interface', None)
|
||||||
return response_json.get('interface', None)
|
|
||||||
except (exc.NotFound, driver_except.TimeOutException):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# Check a custom hostname
|
# Check a custom hostname
|
||||||
|
@ -195,3 +195,6 @@ class NoopAmphoraLoadBalancerDriver(
|
|||||||
|
|
||||||
def reload_vrrp_service(self, loadbalancer):
|
def reload_vrrp_service(self, loadbalancer):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def check(self, amphora, timeout_dict=None):
|
||||||
|
pass
|
||||||
|
@ -310,6 +310,7 @@ AMPHORA_INDEX = 'amphora_index'
|
|||||||
AMPHORA_NETWORK_CONFIG = 'amphora_network_config'
|
AMPHORA_NETWORK_CONFIG = 'amphora_network_config'
|
||||||
AMPHORAE = 'amphorae'
|
AMPHORAE = 'amphorae'
|
||||||
AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config'
|
AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config'
|
||||||
|
AMPHORAE_STATUS = 'amphorae_status'
|
||||||
AMPS_DATA = 'amps_data'
|
AMPS_DATA = 'amps_data'
|
||||||
ANTI_AFFINITY = 'anti-affinity'
|
ANTI_AFFINITY = 'anti-affinity'
|
||||||
ATTEMPT_NUMBER = 'attempt_number'
|
ATTEMPT_NUMBER = 'attempt_number'
|
||||||
@ -381,6 +382,7 @@ MESSAGE = 'message'
|
|||||||
NAME = 'name'
|
NAME = 'name'
|
||||||
NETWORK = 'network'
|
NETWORK = 'network'
|
||||||
NETWORK_ID = 'network_id'
|
NETWORK_ID = 'network_id'
|
||||||
|
NEW_AMPHORA_ID = 'new_amphora_id'
|
||||||
NEXTHOP = 'nexthop'
|
NEXTHOP = 'nexthop'
|
||||||
NICS = 'nics'
|
NICS = 'nics'
|
||||||
OBJECT = 'object'
|
OBJECT = 'object'
|
||||||
@ -427,6 +429,7 @@ TLS_CERTIFICATE_ID = 'tls_certificate_id'
|
|||||||
TLS_CONTAINER_ID = 'tls_container_id'
|
TLS_CONTAINER_ID = 'tls_container_id'
|
||||||
TOPOLOGY = 'topology'
|
TOPOLOGY = 'topology'
|
||||||
TOTAL_CONNECTIONS = 'total_connections'
|
TOTAL_CONNECTIONS = 'total_connections'
|
||||||
|
UNREACHABLE = 'unreachable'
|
||||||
UPDATED_AT = 'updated_at'
|
UPDATED_AT = 'updated_at'
|
||||||
UPDATE_DICT = 'update_dict'
|
UPDATE_DICT = 'update_dict'
|
||||||
UPDATED_PORTS = 'updated_ports'
|
UPDATED_PORTS = 'updated_ports'
|
||||||
@ -554,6 +557,7 @@ ADMIN_DOWN_PORT = 'admin-down-port'
|
|||||||
AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug'
|
AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug'
|
||||||
AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener'
|
AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener'
|
||||||
AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert'
|
AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert'
|
||||||
|
AMPHORAE_GET_CONNECTIVITY_STATUS = 'amphorae-get-connectivity-status'
|
||||||
AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug'
|
AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug'
|
||||||
ATTACH_PORT = 'attach-port'
|
ATTACH_PORT = 'attach-port'
|
||||||
CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta'
|
CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta'
|
||||||
|
@ -240,7 +240,8 @@ class AmphoraFlows(object):
|
|||||||
return delete_amphora_flow
|
return delete_amphora_flow
|
||||||
|
|
||||||
def get_vrrp_subflow(self, prefix, timeout_dict=None,
|
def get_vrrp_subflow(self, prefix, timeout_dict=None,
|
||||||
create_vrrp_group=True):
|
create_vrrp_group=True,
|
||||||
|
get_amphorae_status=True):
|
||||||
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
|
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
|
||||||
vrrp_subflow = linear_flow.Flow(sf_name)
|
vrrp_subflow = linear_flow.Flow(sf_name)
|
||||||
|
|
||||||
@ -256,6 +257,17 @@ class AmphoraFlows(object):
|
|||||||
requires=constants.LOADBALANCER_ID,
|
requires=constants.LOADBALANCER_ID,
|
||||||
provides=constants.AMPHORAE_NETWORK_CONFIG))
|
provides=constants.AMPHORAE_NETWORK_CONFIG))
|
||||||
|
|
||||||
|
if get_amphorae_status:
|
||||||
|
# Get the amphorae_status dict in case the caller hasn't fetched
|
||||||
|
# it yet.
|
||||||
|
vrrp_subflow.add(
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
|
||||||
|
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
|
||||||
|
requires=constants.AMPHORAE,
|
||||||
|
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
|
||||||
|
inject={constants.TIMEOUT_DICT: timeout_dict},
|
||||||
|
provides=constants.AMPHORAE_STATUS))
|
||||||
|
|
||||||
# VRRP update needs to be run on all amphora to update
|
# VRRP update needs to be run on all amphora to update
|
||||||
# their peer configurations. So parallelize this with an
|
# their peer configurations. So parallelize this with an
|
||||||
# unordered subflow.
|
# unordered subflow.
|
||||||
@ -266,7 +278,7 @@ class AmphoraFlows(object):
|
|||||||
|
|
||||||
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
||||||
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
|
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict},
|
constants.TIMEOUT_DICT: timeout_dict},
|
||||||
provides=constants.AMP_VRRP_INT))
|
provides=constants.AMP_VRRP_INT))
|
||||||
@ -275,13 +287,13 @@ class AmphoraFlows(object):
|
|||||||
name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE,
|
name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE,
|
||||||
requires=(constants.LOADBALANCER_ID,
|
requires=(constants.LOADBALANCER_ID,
|
||||||
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
||||||
constants.AMP_VRRP_INT),
|
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
||||||
name=sf_name + '-0-' + constants.AMP_VRRP_START,
|
name=sf_name + '-0-' + constants.AMP_VRRP_START,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -289,7 +301,7 @@ class AmphoraFlows(object):
|
|||||||
|
|
||||||
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
||||||
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
|
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict},
|
constants.TIMEOUT_DICT: timeout_dict},
|
||||||
provides=constants.AMP_VRRP_INT))
|
provides=constants.AMP_VRRP_INT))
|
||||||
@ -298,12 +310,12 @@ class AmphoraFlows(object):
|
|||||||
name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE,
|
name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE,
|
||||||
requires=(constants.LOADBALANCER_ID,
|
requires=(constants.LOADBALANCER_ID,
|
||||||
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
||||||
constants.AMP_VRRP_INT),
|
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
||||||
name=sf_name + '-1-' + constants.AMP_VRRP_START,
|
name=sf_name + '-1-' + constants.AMP_VRRP_START,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -551,6 +563,14 @@ class AmphoraFlows(object):
|
|||||||
constants.CONN_RETRY_INTERVAL:
|
constants.CONN_RETRY_INTERVAL:
|
||||||
CONF.haproxy_amphora.active_connection_rety_interval}
|
CONF.haproxy_amphora.active_connection_rety_interval}
|
||||||
|
|
||||||
|
failover_amp_flow.add(
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
|
||||||
|
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
|
||||||
|
requires=constants.AMPHORAE,
|
||||||
|
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
|
||||||
|
inject={constants.TIMEOUT_DICT: timeout_dict},
|
||||||
|
provides=constants.AMPHORAE_STATUS))
|
||||||
|
|
||||||
# Listeners update needs to be run on all amphora to update
|
# Listeners update needs to be run on all amphora to update
|
||||||
# their peer configurations. So parallelize this with an
|
# their peer configurations. So parallelize this with an
|
||||||
# unordered subflow.
|
# unordered subflow.
|
||||||
@ -561,7 +581,8 @@ class AmphoraFlows(object):
|
|||||||
update_amps_subflow.add(
|
update_amps_subflow.add(
|
||||||
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
||||||
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
|
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: amp_index,
|
inject={constants.AMPHORA_INDEX: amp_index,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -571,7 +592,8 @@ class AmphoraFlows(object):
|
|||||||
if lb_amp_count == 2:
|
if lb_amp_count == 2:
|
||||||
failover_amp_flow.add(
|
failover_amp_flow.add(
|
||||||
self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW,
|
self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW,
|
||||||
timeout_dict, create_vrrp_group=False))
|
timeout_dict, create_vrrp_group=False,
|
||||||
|
get_amphorae_status=False))
|
||||||
|
|
||||||
# Reload the listener. This needs to be done here because
|
# Reload the listener. This needs to be done here because
|
||||||
# it will create the required haproxy check scripts for
|
# it will create the required haproxy check scripts for
|
||||||
@ -587,7 +609,8 @@ class AmphoraFlows(object):
|
|||||||
amphora_driver_tasks.AmphoraIndexListenersReload(
|
amphora_driver_tasks.AmphoraIndexListenersReload(
|
||||||
name=(str(amp_index) + '-' +
|
name=(str(amp_index) + '-' +
|
||||||
constants.AMPHORA_RELOAD_LISTENER),
|
constants.AMPHORA_RELOAD_LISTENER),
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: amp_index,
|
inject={constants.AMPHORA_INDEX: amp_index,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
|
@ -621,6 +621,14 @@ class LoadBalancerFlows(object):
|
|||||||
requires=constants.LOADBALANCER_ID,
|
requires=constants.LOADBALANCER_ID,
|
||||||
provides=constants.AMPHORAE))
|
provides=constants.AMPHORAE))
|
||||||
|
|
||||||
|
failover_LB_flow.add(
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
|
||||||
|
name=(new_amp_role + '-' +
|
||||||
|
constants.AMPHORAE_GET_CONNECTIVITY_STATUS),
|
||||||
|
requires=constants.AMPHORAE,
|
||||||
|
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
|
||||||
|
provides=constants.AMPHORAE_STATUS))
|
||||||
|
|
||||||
# Listeners update needs to be run on all amphora to update
|
# Listeners update needs to be run on all amphora to update
|
||||||
# their peer configurations. So parallelize this with an
|
# their peer configurations. So parallelize this with an
|
||||||
# unordered subflow.
|
# unordered subflow.
|
||||||
@ -635,14 +643,16 @@ class LoadBalancerFlows(object):
|
|||||||
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
||||||
name=(constants.AMPHORA + '-0-' +
|
name=(constants.AMPHORA + '-0-' +
|
||||||
constants.AMP_LISTENER_UPDATE),
|
constants.AMP_LISTENER_UPDATE),
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
update_amps_subflow.add(
|
update_amps_subflow.add(
|
||||||
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
||||||
name=(constants.AMPHORA + '-1-' +
|
name=(constants.AMPHORA + '-1-' +
|
||||||
constants.AMP_LISTENER_UPDATE),
|
constants.AMP_LISTENER_UPDATE),
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -651,7 +661,8 @@ class LoadBalancerFlows(object):
|
|||||||
# Configure and enable keepalived in the amphora
|
# Configure and enable keepalived in the amphora
|
||||||
failover_LB_flow.add(self.amp_flows.get_vrrp_subflow(
|
failover_LB_flow.add(self.amp_flows.get_vrrp_subflow(
|
||||||
new_amp_role + '-' + constants.GET_VRRP_SUBFLOW,
|
new_amp_role + '-' + constants.GET_VRRP_SUBFLOW,
|
||||||
timeout_dict, create_vrrp_group=False))
|
timeout_dict, create_vrrp_group=False,
|
||||||
|
get_amphorae_status=False))
|
||||||
|
|
||||||
# #### End of standby ####
|
# #### End of standby ####
|
||||||
|
|
||||||
|
@ -12,6 +12,8 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
|
from typing import List
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from cryptography import fernet
|
from cryptography import fernet
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -73,10 +75,18 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
|
|||||||
"""Task to update the listeners on one amphora."""
|
"""Task to update the listeners on one amphora."""
|
||||||
|
|
||||||
def execute(self, loadbalancer, amphora_index, amphorae,
|
def execute(self, loadbalancer, amphora_index, amphorae,
|
||||||
timeout_dict=None):
|
amphorae_status: dict, timeout_dict=None):
|
||||||
# Note, we don't want this to cause a revert as it may be used
|
# Note, we don't want this to cause a revert as it may be used
|
||||||
# in a failover flow with both amps failing. Skip it and let
|
# in a failover flow with both amps failing. Skip it and let
|
||||||
# health manager fix it.
|
# health manager fix it.
|
||||||
|
|
||||||
|
amphora_id = amphorae[amphora_index].id
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping listener update because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Make sure we have a fresh load balancer object
|
# Make sure we have a fresh load balancer object
|
||||||
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
|
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
|
||||||
@ -84,7 +94,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
|
|||||||
self.amphora_driver.update_amphora_listeners(
|
self.amphora_driver.update_amphora_listeners(
|
||||||
loadbalancer, amphorae[amphora_index], timeout_dict)
|
loadbalancer, amphorae[amphora_index], timeout_dict)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
amphora_id = amphorae[amphora_index].id
|
|
||||||
LOG.error('Failed to update listeners on amphora %s. Skipping '
|
LOG.error('Failed to update listeners on amphora %s. Skipping '
|
||||||
'this amphora as it is failing to update due to: %s',
|
'this amphora as it is failing to update due to: %s',
|
||||||
amphora_id, str(e))
|
amphora_id, str(e))
|
||||||
@ -129,14 +138,23 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
|
|||||||
"""Task to reload all listeners on an amphora."""
|
"""Task to reload all listeners on an amphora."""
|
||||||
|
|
||||||
def execute(self, loadbalancer, amphora_index, amphorae,
|
def execute(self, loadbalancer, amphora_index, amphorae,
|
||||||
timeout_dict=None):
|
amphorae_status: dict, timeout_dict=None):
|
||||||
"""Execute listener reload routines for listeners on an amphora."""
|
"""Execute listener reload routines for listeners on an amphora."""
|
||||||
|
if amphorae is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
amphora_id = amphorae[amphora_index].id
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping listener reload because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
if loadbalancer.listeners:
|
if loadbalancer.listeners:
|
||||||
try:
|
try:
|
||||||
self.amphora_driver.reload(
|
self.amphora_driver.reload(
|
||||||
loadbalancer, amphorae[amphora_index], timeout_dict)
|
loadbalancer, amphorae[amphora_index], timeout_dict)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
amphora_id = amphorae[amphora_index].id
|
|
||||||
LOG.warning('Failed to reload listeners on amphora %s. '
|
LOG.warning('Failed to reload listeners on amphora %s. '
|
||||||
'Skipping this amphora as it is failing to '
|
'Skipping this amphora as it is failing to '
|
||||||
'reload due to: %s', amphora_id, str(e))
|
'reload due to: %s', amphora_id, str(e))
|
||||||
@ -305,8 +323,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
|
|||||||
class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
|
class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
|
||||||
"""Task to get and update the VRRP interface device name from amphora."""
|
"""Task to get and update the VRRP interface device name from amphora."""
|
||||||
|
|
||||||
def execute(self, amphora_index, amphorae, timeout_dict=None):
|
def execute(self, amphora_index, amphorae, amphorae_status: dict,
|
||||||
|
timeout_dict=None):
|
||||||
amphora_id = amphorae[amphora_index].id
|
amphora_id = amphorae[amphora_index].id
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping VRRP interface update because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
interface = self.amphora_driver.get_interface_from_ip(
|
interface = self.amphora_driver.get_interface_from_ip(
|
||||||
amphorae[amphora_index], amphorae[amphora_index].vrrp_ip,
|
amphorae[amphora_index], amphorae[amphora_index].vrrp_ip,
|
||||||
@ -354,14 +379,21 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
|
|||||||
"""Task to update the VRRP configuration of an amphora."""
|
"""Task to update the VRRP configuration of an amphora."""
|
||||||
|
|
||||||
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
|
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
|
||||||
amphorae, amp_vrrp_int, timeout_dict=None):
|
amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
|
||||||
|
timeout_dict=None):
|
||||||
"""Execute update_vrrp_conf."""
|
"""Execute update_vrrp_conf."""
|
||||||
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
|
|
||||||
id=loadbalancer_id)
|
|
||||||
# Note, we don't want this to cause a revert as it may be used
|
# Note, we don't want this to cause a revert as it may be used
|
||||||
# in a failover flow with both amps failing. Skip it and let
|
# in a failover flow with both amps failing. Skip it and let
|
||||||
# health manager fix it.
|
# health manager fix it.
|
||||||
amphora_id = amphorae[amphora_index].id
|
amphora_id = amphorae[amphora_index].id
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping VRRP configuration because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
|
||||||
|
id=loadbalancer_id)
|
||||||
amphorae[amphora_index].vrrp_interface = amp_vrrp_int
|
amphorae[amphora_index].vrrp_interface = amp_vrrp_int
|
||||||
try:
|
try:
|
||||||
self.amphora_driver.update_vrrp_conf(
|
self.amphora_driver.update_vrrp_conf(
|
||||||
@ -394,8 +426,15 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
|
|||||||
This will reload keepalived if it is already running.
|
This will reload keepalived if it is already running.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def execute(self, amphora_index, amphorae, timeout_dict=None):
|
def execute(self, amphora_index, amphorae, amphorae_status: dict,
|
||||||
|
timeout_dict=None):
|
||||||
amphora_id = amphorae[amphora_index].id
|
amphora_id = amphorae[amphora_index].id
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping VRRP start because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.amphora_driver.start_vrrp_service(amphorae[amphora_index],
|
self.amphora_driver.start_vrrp_service(amphorae[amphora_index],
|
||||||
timeout_dict)
|
timeout_dict)
|
||||||
@ -451,3 +490,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask):
|
|||||||
LOG.error('Amphora %s does not support agent configuration '
|
LOG.error('Amphora %s does not support agent configuration '
|
||||||
'update. Please update the amphora image for this '
|
'update. Please update the amphora image for this '
|
||||||
'amphora. Skipping.', amphora.id)
|
'amphora. Skipping.', amphora.id)
|
||||||
|
|
||||||
|
|
||||||
|
class AmphoraeGetConnectivityStatus(BaseAmphoraTask):
|
||||||
|
"""Task that checks amphorae connectivity status.
|
||||||
|
|
||||||
|
Check and return the connectivity status of both amphorae in ACTIVE STANDBY
|
||||||
|
load balancers
|
||||||
|
"""
|
||||||
|
|
||||||
|
def execute(self, amphorae: List[dict], new_amphora_id: str,
|
||||||
|
timeout_dict=None):
|
||||||
|
amphorae_status = {}
|
||||||
|
|
||||||
|
for amphora in amphorae:
|
||||||
|
amphora_id = amphora.id
|
||||||
|
amphorae_status[amphora_id] = {}
|
||||||
|
|
||||||
|
session = db_apis.get_session()
|
||||||
|
with session.begin():
|
||||||
|
db_amp = self.amphora_repo.get(session, id=amphora_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Verify if the amphora is reachable
|
||||||
|
self.amphora_driver.check(db_amp, timeout_dict=timeout_dict)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception("Cannot get status for amphora %s",
|
||||||
|
amphora_id)
|
||||||
|
# In case it fails and the tested amphora is the newly created
|
||||||
|
# amphora, it's not a normal error handling, re-raise the
|
||||||
|
# exception
|
||||||
|
if amphora_id == new_amphora_id:
|
||||||
|
raise e
|
||||||
|
amphorae_status[amphora_id][constants.UNREACHABLE] = True
|
||||||
|
else:
|
||||||
|
amphorae_status[amphora_id][constants.UNREACHABLE] = False
|
||||||
|
|
||||||
|
return amphorae_status
|
||||||
|
@ -226,7 +226,8 @@ class AmphoraFlows(object):
|
|||||||
return delete_amphora_flow
|
return delete_amphora_flow
|
||||||
|
|
||||||
def get_vrrp_subflow(self, prefix, timeout_dict=None,
|
def get_vrrp_subflow(self, prefix, timeout_dict=None,
|
||||||
create_vrrp_group=True):
|
create_vrrp_group=True,
|
||||||
|
get_amphorae_status=True):
|
||||||
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
|
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
|
||||||
vrrp_subflow = linear_flow.Flow(sf_name)
|
vrrp_subflow = linear_flow.Flow(sf_name)
|
||||||
|
|
||||||
@ -242,6 +243,17 @@ class AmphoraFlows(object):
|
|||||||
requires=constants.LOADBALANCER_ID,
|
requires=constants.LOADBALANCER_ID,
|
||||||
provides=constants.AMPHORAE_NETWORK_CONFIG))
|
provides=constants.AMPHORAE_NETWORK_CONFIG))
|
||||||
|
|
||||||
|
if get_amphorae_status:
|
||||||
|
# Get the amphorae_status dict in case the caller hasn't fetched
|
||||||
|
# it yet.
|
||||||
|
vrrp_subflow.add(
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
|
||||||
|
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
|
||||||
|
requires=constants.AMPHORAE,
|
||||||
|
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
|
||||||
|
inject={constants.TIMEOUT_DICT: timeout_dict},
|
||||||
|
provides=constants.AMPHORAE_STATUS))
|
||||||
|
|
||||||
# VRRP update needs to be run on all amphora to update
|
# VRRP update needs to be run on all amphora to update
|
||||||
# their peer configurations. So parallelize this with an
|
# their peer configurations. So parallelize this with an
|
||||||
# unordered subflow.
|
# unordered subflow.
|
||||||
@ -252,7 +264,7 @@ class AmphoraFlows(object):
|
|||||||
|
|
||||||
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
||||||
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
|
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict},
|
constants.TIMEOUT_DICT: timeout_dict},
|
||||||
provides=constants.AMP_VRRP_INT))
|
provides=constants.AMP_VRRP_INT))
|
||||||
@ -261,13 +273,13 @@ class AmphoraFlows(object):
|
|||||||
name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE,
|
name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE,
|
||||||
requires=(constants.LOADBALANCER_ID,
|
requires=(constants.LOADBALANCER_ID,
|
||||||
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
||||||
constants.AMP_VRRP_INT),
|
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
||||||
name=sf_name + '-0-' + constants.AMP_VRRP_START,
|
name=sf_name + '-0-' + constants.AMP_VRRP_START,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -275,7 +287,7 @@ class AmphoraFlows(object):
|
|||||||
|
|
||||||
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
|
||||||
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
|
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict},
|
constants.TIMEOUT_DICT: timeout_dict},
|
||||||
provides=constants.AMP_VRRP_INT))
|
provides=constants.AMP_VRRP_INT))
|
||||||
@ -284,12 +296,12 @@ class AmphoraFlows(object):
|
|||||||
name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE,
|
name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE,
|
||||||
requires=(constants.LOADBALANCER_ID,
|
requires=(constants.LOADBALANCER_ID,
|
||||||
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
|
||||||
constants.AMP_VRRP_INT),
|
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
||||||
name=sf_name + '-1-' + constants.AMP_VRRP_START,
|
name=sf_name + '-1-' + constants.AMP_VRRP_START,
|
||||||
requires=constants.AMPHORAE,
|
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -537,6 +549,14 @@ class AmphoraFlows(object):
|
|||||||
constants.CONN_RETRY_INTERVAL:
|
constants.CONN_RETRY_INTERVAL:
|
||||||
CONF.haproxy_amphora.active_connection_rety_interval}
|
CONF.haproxy_amphora.active_connection_rety_interval}
|
||||||
|
|
||||||
|
failover_amp_flow.add(
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
|
||||||
|
name=constants.AMPHORAE_GET_CONNECTIVITY_STATUS,
|
||||||
|
requires=constants.AMPHORAE,
|
||||||
|
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
|
||||||
|
inject={constants.TIMEOUT_DICT: timeout_dict},
|
||||||
|
provides=constants.AMPHORAE_STATUS))
|
||||||
|
|
||||||
# Listeners update needs to be run on all amphora to update
|
# Listeners update needs to be run on all amphora to update
|
||||||
# their peer configurations. So parallelize this with an
|
# their peer configurations. So parallelize this with an
|
||||||
# unordered subflow.
|
# unordered subflow.
|
||||||
@ -547,7 +567,8 @@ class AmphoraFlows(object):
|
|||||||
update_amps_subflow.add(
|
update_amps_subflow.add(
|
||||||
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
||||||
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
|
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: amp_index,
|
inject={constants.AMPHORA_INDEX: amp_index,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -557,7 +578,8 @@ class AmphoraFlows(object):
|
|||||||
if lb_amp_count == 2:
|
if lb_amp_count == 2:
|
||||||
failover_amp_flow.add(
|
failover_amp_flow.add(
|
||||||
self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW,
|
self.get_vrrp_subflow(constants.GET_VRRP_SUBFLOW,
|
||||||
timeout_dict, create_vrrp_group=False))
|
timeout_dict, create_vrrp_group=False,
|
||||||
|
get_amphorae_status=False))
|
||||||
|
|
||||||
# Reload the listener. This needs to be done here because
|
# Reload the listener. This needs to be done here because
|
||||||
# it will create the required haproxy check scripts for
|
# it will create the required haproxy check scripts for
|
||||||
@ -573,7 +595,8 @@ class AmphoraFlows(object):
|
|||||||
amphora_driver_tasks.AmphoraIndexListenersReload(
|
amphora_driver_tasks.AmphoraIndexListenersReload(
|
||||||
name=(str(amp_index) + '-' +
|
name=(str(amp_index) + '-' +
|
||||||
constants.AMPHORA_RELOAD_LISTENER),
|
constants.AMPHORA_RELOAD_LISTENER),
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: amp_index,
|
inject={constants.AMPHORA_INDEX: amp_index,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
|
@ -611,6 +611,14 @@ class LoadBalancerFlows(object):
|
|||||||
requires=constants.LOADBALANCER_ID,
|
requires=constants.LOADBALANCER_ID,
|
||||||
provides=constants.AMPHORAE))
|
provides=constants.AMPHORAE))
|
||||||
|
|
||||||
|
failover_LB_flow.add(
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus(
|
||||||
|
name=(new_amp_role + '-' +
|
||||||
|
constants.AMPHORAE_GET_CONNECTIVITY_STATUS),
|
||||||
|
requires=constants.AMPHORAE,
|
||||||
|
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
|
||||||
|
provides=constants.AMPHORAE_STATUS))
|
||||||
|
|
||||||
# Listeners update needs to be run on all amphora to update
|
# Listeners update needs to be run on all amphora to update
|
||||||
# their peer configurations. So parallelize this with an
|
# their peer configurations. So parallelize this with an
|
||||||
# unordered subflow.
|
# unordered subflow.
|
||||||
@ -625,14 +633,16 @@ class LoadBalancerFlows(object):
|
|||||||
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
||||||
name=(constants.AMPHORA + '-0-' +
|
name=(constants.AMPHORA + '-0-' +
|
||||||
constants.AMP_LISTENER_UPDATE),
|
constants.AMP_LISTENER_UPDATE),
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 0,
|
inject={constants.AMPHORA_INDEX: 0,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
update_amps_subflow.add(
|
update_amps_subflow.add(
|
||||||
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
amphora_driver_tasks.AmphoraIndexListenerUpdate(
|
||||||
name=(constants.AMPHORA + '-1-' +
|
name=(constants.AMPHORA + '-1-' +
|
||||||
constants.AMP_LISTENER_UPDATE),
|
constants.AMP_LISTENER_UPDATE),
|
||||||
requires=(constants.LOADBALANCER, constants.AMPHORAE),
|
requires=(constants.LOADBALANCER, constants.AMPHORAE,
|
||||||
|
constants.AMPHORAE_STATUS),
|
||||||
inject={constants.AMPHORA_INDEX: 1,
|
inject={constants.AMPHORA_INDEX: 1,
|
||||||
constants.TIMEOUT_DICT: timeout_dict}))
|
constants.TIMEOUT_DICT: timeout_dict}))
|
||||||
|
|
||||||
@ -641,7 +651,8 @@ class LoadBalancerFlows(object):
|
|||||||
# Configure and enable keepalived in the amphora
|
# Configure and enable keepalived in the amphora
|
||||||
failover_LB_flow.add(self.amp_flows.get_vrrp_subflow(
|
failover_LB_flow.add(self.amp_flows.get_vrrp_subflow(
|
||||||
new_amp_role + '-' + constants.GET_VRRP_SUBFLOW,
|
new_amp_role + '-' + constants.GET_VRRP_SUBFLOW,
|
||||||
timeout_dict, create_vrrp_group=False))
|
timeout_dict, create_vrrp_group=False,
|
||||||
|
get_amphorae_status=False))
|
||||||
|
|
||||||
# #### End of standby ####
|
# #### End of standby ####
|
||||||
|
|
||||||
|
@ -14,6 +14,9 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
from typing import List
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from cryptography import fernet
|
from cryptography import fernet
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
@ -99,10 +102,19 @@ class AmpListenersUpdate(BaseAmphoraTask):
|
|||||||
class AmphoraIndexListenerUpdate(BaseAmphoraTask):
|
class AmphoraIndexListenerUpdate(BaseAmphoraTask):
|
||||||
"""Task to update the listeners on one amphora."""
|
"""Task to update the listeners on one amphora."""
|
||||||
|
|
||||||
def execute(self, loadbalancer, amphora_index, amphorae, timeout_dict=()):
|
def execute(self, loadbalancer, amphora_index, amphorae,
|
||||||
|
amphorae_status: dict, timeout_dict=()):
|
||||||
# Note, we don't want this to cause a revert as it may be used
|
# Note, we don't want this to cause a revert as it may be used
|
||||||
# in a failover flow with both amps failing. Skip it and let
|
# in a failover flow with both amps failing. Skip it and let
|
||||||
# health manager fix it.
|
# health manager fix it.
|
||||||
|
|
||||||
|
amphora_id = amphorae[amphora_index].get(constants.ID)
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping listener update because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# TODO(johnsom) Optimize this to use the dicts and not need the
|
# TODO(johnsom) Optimize this to use the dicts and not need the
|
||||||
# DB lookups
|
# DB lookups
|
||||||
@ -115,7 +127,6 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
|
|||||||
self.amphora_driver.update_amphora_listeners(
|
self.amphora_driver.update_amphora_listeners(
|
||||||
db_lb, db_amp, timeout_dict)
|
db_lb, db_amp, timeout_dict)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
amphora_id = amphorae[amphora_index].get(constants.ID)
|
|
||||||
LOG.error('Failed to update listeners on amphora %s. Skipping '
|
LOG.error('Failed to update listeners on amphora %s. Skipping '
|
||||||
'this amphora as it is failing to update due to: %s',
|
'this amphora as it is failing to update due to: %s',
|
||||||
amphora_id, str(e))
|
amphora_id, str(e))
|
||||||
@ -177,10 +188,18 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
|
|||||||
"""Task to reload all listeners on an amphora."""
|
"""Task to reload all listeners on an amphora."""
|
||||||
|
|
||||||
def execute(self, loadbalancer, amphora_index, amphorae,
|
def execute(self, loadbalancer, amphora_index, amphorae,
|
||||||
timeout_dict=None):
|
amphorae_status: dict, timeout_dict=None):
|
||||||
"""Execute listener reload routines for listeners on an amphora."""
|
"""Execute listener reload routines for listeners on an amphora."""
|
||||||
if amphorae is None:
|
if amphorae is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
amphora_id = amphorae[amphora_index].get(constants.ID)
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping listener reload because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
# TODO(johnsom) Optimize this to use the dicts and not need the
|
# TODO(johnsom) Optimize this to use the dicts and not need the
|
||||||
# DB lookups
|
# DB lookups
|
||||||
db_amp = self.amphora_repo.get(
|
db_amp = self.amphora_repo.get(
|
||||||
@ -192,7 +211,6 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
|
|||||||
try:
|
try:
|
||||||
self.amphora_driver.reload(db_lb, db_amp, timeout_dict)
|
self.amphora_driver.reload(db_lb, db_amp, timeout_dict)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
amphora_id = amphorae[amphora_index][constants.ID]
|
|
||||||
LOG.warning('Failed to reload listeners on amphora %s. '
|
LOG.warning('Failed to reload listeners on amphora %s. '
|
||||||
'Skipping this amphora as it is failing to '
|
'Skipping this amphora as it is failing to '
|
||||||
'reload due to: %s', amphora_id, str(e))
|
'reload due to: %s', amphora_id, str(e))
|
||||||
@ -421,8 +439,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
|
|||||||
class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
|
class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
|
||||||
"""Task to get and update the VRRP interface device name from amphora."""
|
"""Task to get and update the VRRP interface device name from amphora."""
|
||||||
|
|
||||||
def execute(self, amphora_index, amphorae, timeout_dict=None):
|
def execute(self, amphora_index, amphorae, amphorae_status: dict,
|
||||||
|
timeout_dict=None):
|
||||||
amphora_id = amphorae[amphora_index][constants.ID]
|
amphora_id = amphorae[amphora_index][constants.ID]
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping VRRP interface update because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# TODO(johnsom) Optimize this to use the dicts and not need the
|
# TODO(johnsom) Optimize this to use the dicts and not need the
|
||||||
# DB lookups
|
# DB lookups
|
||||||
@ -478,12 +503,19 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
|
|||||||
"""Task to update the VRRP configuration of an amphora."""
|
"""Task to update the VRRP configuration of an amphora."""
|
||||||
|
|
||||||
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
|
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
|
||||||
amphorae, amp_vrrp_int, timeout_dict=None):
|
amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
|
||||||
|
timeout_dict=None):
|
||||||
"""Execute update_vrrp_conf."""
|
"""Execute update_vrrp_conf."""
|
||||||
# Note, we don't want this to cause a revert as it may be used
|
# Note, we don't want this to cause a revert as it may be used
|
||||||
# in a failover flow with both amps failing. Skip it and let
|
# in a failover flow with both amps failing. Skip it and let
|
||||||
# health manager fix it.
|
# health manager fix it.
|
||||||
amphora_id = amphorae[amphora_index][constants.ID]
|
amphora_id = amphorae[amphora_index][constants.ID]
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping VRRP configuration because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# TODO(johnsom) Optimize this to use the dicts and not need the
|
# TODO(johnsom) Optimize this to use the dicts and not need the
|
||||||
# DB lookups
|
# DB lookups
|
||||||
@ -525,10 +557,17 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
|
|||||||
This will reload keepalived if it is already running.
|
This will reload keepalived if it is already running.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def execute(self, amphora_index, amphorae, timeout_dict=None):
|
def execute(self, amphora_index, amphorae, amphorae_status: dict,
|
||||||
|
timeout_dict=None):
|
||||||
# TODO(johnsom) Optimize this to use the dicts and not need the
|
# TODO(johnsom) Optimize this to use the dicts and not need the
|
||||||
# DB lookups
|
# DB lookups
|
||||||
amphora_id = amphorae[amphora_index][constants.ID]
|
amphora_id = amphorae[amphora_index][constants.ID]
|
||||||
|
amphora_status = amphorae_status.get(amphora_id, {})
|
||||||
|
if amphora_status.get(constants.UNREACHABLE):
|
||||||
|
LOG.warning("Skipping VRRP start because amphora %s "
|
||||||
|
"is not reachable.", amphora_id)
|
||||||
|
return
|
||||||
|
|
||||||
db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora_id)
|
db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora_id)
|
||||||
try:
|
try:
|
||||||
self.amphora_driver.start_vrrp_service(db_amp, timeout_dict)
|
self.amphora_driver.start_vrrp_service(db_amp, timeout_dict)
|
||||||
@ -592,3 +631,40 @@ class AmphoraConfigUpdate(BaseAmphoraTask):
|
|||||||
'update. Please update the amphora image for this '
|
'update. Please update the amphora image for this '
|
||||||
'amphora. Skipping.'.
|
'amphora. Skipping.'.
|
||||||
format(amphora.get(constants.ID)))
|
format(amphora.get(constants.ID)))
|
||||||
|
|
||||||
|
|
||||||
|
class AmphoraeGetConnectivityStatus(BaseAmphoraTask):
|
||||||
|
"""Task that checks amphorae connectivity status.
|
||||||
|
|
||||||
|
Check and return the connectivity status of both amphorae in ACTIVE STANDBY
|
||||||
|
load balancers
|
||||||
|
"""
|
||||||
|
|
||||||
|
def execute(self, amphorae: List[dict], new_amphora_id: str,
|
||||||
|
timeout_dict=None):
|
||||||
|
amphorae_status = {}
|
||||||
|
|
||||||
|
for amphora in amphorae:
|
||||||
|
amphora_id = amphora[constants.ID]
|
||||||
|
amphorae_status[amphora_id] = {}
|
||||||
|
|
||||||
|
session = db_apis.get_session()
|
||||||
|
with session.begin():
|
||||||
|
db_amp = self.amphora_repo.get(session, id=amphora_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Verify if the amphora is reachable
|
||||||
|
self.amphora_driver.check(db_amp, timeout_dict=timeout_dict)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception("Cannot get status for amphora %s",
|
||||||
|
amphora_id)
|
||||||
|
# In case it fails and the tested amphora is the newly created
|
||||||
|
# amphora, it's not a normal error handling, re-raise the
|
||||||
|
# exception
|
||||||
|
if amphora_id == new_amphora_id:
|
||||||
|
raise e
|
||||||
|
amphorae_status[amphora_id][constants.UNREACHABLE] = True
|
||||||
|
else:
|
||||||
|
amphorae_status[amphora_id][constants.UNREACHABLE] = False
|
||||||
|
|
||||||
|
return amphorae_status
|
||||||
|
@ -75,9 +75,9 @@ class TestHAProxyAmphoraDriver(base.TestCase):
|
|||||||
mock_api_version.reset_mock()
|
mock_api_version.reset_mock()
|
||||||
client_mock.reset_mock()
|
client_mock.reset_mock()
|
||||||
|
|
||||||
result = self.driver.get_interface_from_ip(amphora_mock, IP_ADDRESS)
|
self.assertRaises(
|
||||||
|
exc.NotFound,
|
||||||
self.assertIsNone(result)
|
self.driver.get_interface_from_ip, amphora_mock, IP_ADDRESS)
|
||||||
mock_api_version.assert_called_once_with(amphora_mock, None)
|
mock_api_version.assert_called_once_with(amphora_mock, None)
|
||||||
client_mock.get_interface.assert_called_once_with(
|
client_mock.get_interface.assert_called_once_with(
|
||||||
amphora_mock, IP_ADDRESS, None, log_error=False)
|
amphora_mock, IP_ADDRESS, None, log_error=False)
|
||||||
|
@ -243,6 +243,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
||||||
@ -253,7 +254,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(7, len(amp_flow.requires))
|
self.assertEqual(7, len(amp_flow.requires))
|
||||||
self.assertEqual(13, len(amp_flow.provides))
|
self.assertEqual(14, len(amp_flow.provides))
|
||||||
|
|
||||||
def test_get_failover_flow_standalone(self, mock_get_net_driver):
|
def test_get_failover_flow_standalone(self, mock_get_net_driver):
|
||||||
failed_amphora = data_models.Amphora(
|
failed_amphora = data_models.Amphora(
|
||||||
@ -276,6 +277,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
||||||
@ -286,7 +288,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(7, len(amp_flow.requires))
|
self.assertEqual(7, len(amp_flow.requires))
|
||||||
self.assertEqual(12, len(amp_flow.provides))
|
self.assertEqual(13, len(amp_flow.provides))
|
||||||
|
|
||||||
def test_get_failover_flow_bogus_role(self, mock_get_net_driver):
|
def test_get_failover_flow_bogus_role(self, mock_get_net_driver):
|
||||||
failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(),
|
failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(),
|
||||||
@ -324,12 +326,30 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
||||||
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
|
||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
||||||
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
|
||||||
|
|
||||||
|
self.assertEqual(3, len(vrrp_subflow.provides))
|
||||||
|
self.assertEqual(3, len(vrrp_subflow.requires))
|
||||||
|
|
||||||
|
def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver):
|
||||||
|
vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123',
|
||||||
|
get_amphorae_status=False)
|
||||||
|
|
||||||
|
self.assertIsInstance(vrrp_subflow, flow.Flow)
|
||||||
|
|
||||||
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
||||||
|
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
||||||
|
|
||||||
|
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
|
||||||
|
|
||||||
self.assertEqual(2, len(vrrp_subflow.provides))
|
self.assertEqual(2, len(vrrp_subflow.provides))
|
||||||
self.assertEqual(2, len(vrrp_subflow.requires))
|
self.assertEqual(3, len(vrrp_subflow.requires))
|
||||||
|
|
||||||
def test_get_vrrp_subflow_dont_create_vrrp_group(
|
def test_get_vrrp_subflow_dont_create_vrrp_group(
|
||||||
self, mock_get_net_driver):
|
self, mock_get_net_driver):
|
||||||
@ -340,12 +360,14 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
||||||
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
|
||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
||||||
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
|
||||||
|
|
||||||
self.assertEqual(2, len(vrrp_subflow.provides))
|
self.assertEqual(3, len(vrrp_subflow.provides))
|
||||||
self.assertEqual(2, len(vrrp_subflow.requires))
|
self.assertEqual(3, len(vrrp_subflow.requires))
|
||||||
|
|
||||||
def test_get_post_map_lb_subflow(self, mock_get_net_driver):
|
def test_get_post_map_lb_subflow(self, mock_get_net_driver):
|
||||||
|
|
||||||
|
@ -155,10 +155,16 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
||||||
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
|
||||||
|
|
||||||
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(4, len(amp_flow.provides))
|
self.assertEqual(5, len(amp_flow.provides))
|
||||||
self.assertEqual(2, len(amp_flow.requires))
|
self.assertEqual(3, len(amp_flow.requires))
|
||||||
|
|
||||||
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
|
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
|
||||||
'123', constants.TOPOLOGY_ACTIVE_STANDBY)
|
'123', constants.TOPOLOGY_ACTIVE_STANDBY)
|
||||||
@ -167,10 +173,16 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
||||||
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
|
||||||
|
|
||||||
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
|
||||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(4, len(amp_flow.provides))
|
self.assertEqual(5, len(amp_flow.provides))
|
||||||
self.assertEqual(2, len(amp_flow.requires))
|
self.assertEqual(3, len(amp_flow.requires))
|
||||||
|
|
||||||
def test_get_create_load_balancer_flows_single_listeners(
|
def test_get_create_load_balancer_flows_single_listeners(
|
||||||
self, mock_get_net_driver):
|
self, mock_get_net_driver):
|
||||||
@ -219,6 +231,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
self.assertIn(constants.LISTENERS, create_flow.provides)
|
self.assertIn(constants.LISTENERS, create_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA, create_flow.provides)
|
self.assertIn(constants.AMPHORA, create_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, create_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, create_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, create_flow.provides)
|
self.assertIn(constants.COMPUTE_ID, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
|
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
|
||||||
self.assertIn(constants.LOADBALANCER, create_flow.provides)
|
self.assertIn(constants.LOADBALANCER, create_flow.provides)
|
||||||
@ -230,7 +243,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
create_flow.provides)
|
create_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(6, len(create_flow.requires))
|
self.assertEqual(6, len(create_flow.requires))
|
||||||
self.assertEqual(16, len(create_flow.provides),
|
self.assertEqual(17, len(create_flow.provides),
|
||||||
create_flow.provides)
|
create_flow.provides)
|
||||||
|
|
||||||
def _test_get_failover_LB_flow_single(self, amphorae):
|
def _test_get_failover_LB_flow_single(self, amphorae):
|
||||||
@ -322,6 +335,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
self.assertIn(constants.AMPHORA, failover_flow.provides)
|
self.assertIn(constants.AMPHORA, failover_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, failover_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, failover_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE, failover_flow.provides)
|
self.assertIn(constants.AMPHORAE, failover_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, failover_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||||
failover_flow.provides)
|
failover_flow.provides)
|
||||||
self.assertIn(constants.BASE_PORT, failover_flow.provides)
|
self.assertIn(constants.BASE_PORT, failover_flow.provides)
|
||||||
@ -339,7 +353,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertEqual(6, len(failover_flow.requires),
|
self.assertEqual(6, len(failover_flow.requires),
|
||||||
failover_flow.requires)
|
failover_flow.requires)
|
||||||
self.assertEqual(16, len(failover_flow.provides),
|
self.assertEqual(17, len(failover_flow.provides),
|
||||||
failover_flow.provides)
|
failover_flow.provides)
|
||||||
|
|
||||||
def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver):
|
def test_get_failover_LB_flow_no_amps_act_stdby(self, mock_get_net_driver):
|
||||||
|
@ -125,17 +125,37 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
mock_amphora_repo_update):
|
mock_amphora_repo_update):
|
||||||
|
|
||||||
mock_lb_repo_get.return_value = _LB_mock
|
mock_lb_repo_get.return_value = _LB_mock
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
|
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
|
||||||
amp_list_update_obj.execute(_load_balancer_mock, 0,
|
amp_list_update_obj.execute(_load_balancer_mock, 0,
|
||||||
[_amphora_mock], self.timeout_dict)
|
[_amphora_mock], amphorae_status,
|
||||||
|
self.timeout_dict)
|
||||||
|
|
||||||
mock_driver.update_amphora_listeners.assert_called_once_with(
|
mock_driver.update_amphora_listeners.assert_called_once_with(
|
||||||
_LB_mock, _amphora_mock, self.timeout_dict)
|
_LB_mock, _amphora_mock, self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
mock_driver.reset_mock()
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
|
||||||
|
amphorae_status, self.timeout_dict)
|
||||||
|
mock_driver.update_amphora_listeners.assert_not_called()
|
||||||
|
|
||||||
|
# Test exception
|
||||||
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
|
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
|
||||||
|
|
||||||
amp_list_update_obj.execute(_load_balancer_mock, 0,
|
amp_list_update_obj.execute(_load_balancer_mock, 0,
|
||||||
[_amphora_mock], self.timeout_dict)
|
[_amphora_mock], {},
|
||||||
|
self.timeout_dict)
|
||||||
|
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, AMP_ID, status=constants.ERROR)
|
_session_mock, AMP_ID, status=constants.ERROR)
|
||||||
@ -217,20 +237,38 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
|
|
||||||
# Test no listeners
|
# Test no listeners
|
||||||
mock_lb.listeners = None
|
mock_lb.listeners = None
|
||||||
listeners_reload_obj.execute(mock_lb, 0, None)
|
listeners_reload_obj.execute(mock_lb, 0, None, {})
|
||||||
mock_driver.reload.assert_not_called()
|
mock_driver.reload.assert_not_called()
|
||||||
|
|
||||||
# Test with listeners
|
# Test with listeners
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
mock_driver.start.reset_mock()
|
mock_driver.start.reset_mock()
|
||||||
mock_lb.listeners = [mock_listener]
|
mock_lb.listeners = [mock_listener]
|
||||||
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
|
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
|
||||||
|
amphorae_status,
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
|
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
|
||||||
self.timeout_dict)
|
self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mock_driver.reload.reset_mock()
|
||||||
|
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
|
||||||
|
amphorae_status,
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.reload.assert_not_called()
|
||||||
|
|
||||||
# Test with reload exception
|
# Test with reload exception
|
||||||
mock_driver.reload.reset_mock()
|
mock_driver.reload.reset_mock()
|
||||||
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
|
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {},
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
|
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
|
||||||
self.timeout_dict)
|
self.timeout_dict)
|
||||||
@ -604,6 +642,11 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
_LB_mock.amphorae = _amphorae_mock
|
_LB_mock.amphorae = _amphorae_mock
|
||||||
mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE,
|
mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE,
|
||||||
Exception('boom')]
|
Exception('boom')]
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
|
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
|
||||||
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
|
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
|
||||||
@ -611,16 +654,27 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
amphora_update_vrrp_interface_obj = (
|
amphora_update_vrrp_interface_obj = (
|
||||||
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
|
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
|
||||||
amphora_update_vrrp_interface_obj.execute(
|
amphora_update_vrrp_interface_obj.execute(
|
||||||
0, [_amphora_mock], timeout_dict)
|
0, [_amphora_mock], amphorae_status, timeout_dict)
|
||||||
mock_driver.get_interface_from_ip.assert_called_once_with(
|
mock_driver.get_interface_from_ip.assert_called_once_with(
|
||||||
_amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict)
|
_amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict)
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, _amphora_mock.id, vrrp_interface=FAKE_INTERFACE)
|
_session_mock, _amphora_mock.id, vrrp_interface=FAKE_INTERFACE)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
mock_driver.reset_mock()
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amphora_update_vrrp_interface_obj.execute(
|
||||||
|
0, [_amphora_mock], amphorae_status, timeout_dict)
|
||||||
|
mock_driver.get_interface_from_ip.assert_not_called()
|
||||||
|
|
||||||
# Test with an exception
|
# Test with an exception
|
||||||
mock_amphora_repo_update.reset_mock()
|
mock_amphora_repo_update.reset_mock()
|
||||||
amphora_update_vrrp_interface_obj.execute(
|
amphora_update_vrrp_interface_obj.execute(
|
||||||
0, [_amphora_mock], timeout_dict)
|
0, [_amphora_mock], {}, timeout_dict)
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, _amphora_mock.id, status=constants.ERROR)
|
_session_mock, _amphora_mock.id, status=constants.ERROR)
|
||||||
|
|
||||||
@ -666,20 +720,41 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
mock_driver.update_vrrp_conf.side_effect = [mock.DEFAULT,
|
mock_driver.update_vrrp_conf.side_effect = [mock.DEFAULT,
|
||||||
Exception('boom')]
|
Exception('boom')]
|
||||||
mock_lb_get.return_value = _LB_mock
|
mock_lb_get.return_value = _LB_mock
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
amphora_vrrp_update_obj = (
|
amphora_vrrp_update_obj = (
|
||||||
amphora_driver_tasks.AmphoraIndexVRRPUpdate())
|
amphora_driver_tasks.AmphoraIndexVRRPUpdate())
|
||||||
|
|
||||||
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
|
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
|
||||||
0, [_amphora_mock], 'fakeint0',
|
0, [_amphora_mock], amphorae_status,
|
||||||
|
'fakeint0',
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.update_vrrp_conf.assert_called_once_with(
|
mock_driver.update_vrrp_conf.assert_called_once_with(
|
||||||
_LB_mock, amphorae_network_config, _amphora_mock,
|
_LB_mock, amphorae_network_config, _amphora_mock,
|
||||||
self.timeout_dict)
|
self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mock_amphora_repo_update.reset_mock()
|
||||||
|
mock_driver.update_vrrp_conf.reset_mock()
|
||||||
|
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
|
||||||
|
0, [_amphora_mock], amphorae_status,
|
||||||
|
None)
|
||||||
|
mock_driver.update_vrrp_conf.assert_not_called()
|
||||||
|
|
||||||
# Test with an exception
|
# Test with an exception
|
||||||
mock_amphora_repo_update.reset_mock()
|
mock_amphora_repo_update.reset_mock()
|
||||||
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
|
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
|
||||||
0, [_amphora_mock], 'fakeint0')
|
0, [_amphora_mock], {},
|
||||||
|
'fakeint0')
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, _amphora_mock.id, status=constants.ERROR)
|
_session_mock, _amphora_mock.id, status=constants.ERROR)
|
||||||
|
|
||||||
@ -706,19 +781,36 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
mock_listener_repo_get,
|
mock_listener_repo_get,
|
||||||
mock_listener_repo_update,
|
mock_listener_repo_update,
|
||||||
mock_amphora_repo_update):
|
mock_amphora_repo_update):
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
amphora_vrrp_start_obj = (
|
amphora_vrrp_start_obj = (
|
||||||
amphora_driver_tasks.AmphoraIndexVRRPStart())
|
amphora_driver_tasks.AmphoraIndexVRRPStart())
|
||||||
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
|
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
|
||||||
Exception('boom')]
|
Exception('boom')]
|
||||||
|
|
||||||
amphora_vrrp_start_obj.execute(0, [_amphora_mock],
|
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.start_vrrp_service.assert_called_once_with(
|
mock_driver.start_vrrp_service.assert_called_once_with(
|
||||||
_amphora_mock, self.timeout_dict)
|
_amphora_mock, self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
mock_driver.start_vrrp_service.reset_mock()
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock.id: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.start_vrrp_service.assert_not_called()
|
||||||
|
|
||||||
# Test with a start exception
|
# Test with a start exception
|
||||||
mock_driver.start_vrrp_service.reset_mock()
|
mock_driver.start_vrrp_service.reset_mock()
|
||||||
amphora_vrrp_start_obj.execute(0, [_amphora_mock],
|
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.start_vrrp_service.assert_called_once_with(
|
mock_driver.start_vrrp_service.assert_called_once_with(
|
||||||
_amphora_mock, self.timeout_dict)
|
_amphora_mock, self.timeout_dict)
|
||||||
@ -790,3 +882,75 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
self.assertRaises(driver_except.TimeOutException,
|
self.assertRaises(driver_except.TimeOutException,
|
||||||
amp_config_update_obj.execute,
|
amp_config_update_obj.execute,
|
||||||
_amphora_mock, flavor)
|
_amphora_mock, flavor)
|
||||||
|
|
||||||
|
@mock.patch('octavia.db.repositories.AmphoraRepository.get')
|
||||||
|
def test_amphorae_get_connectivity_status(self,
|
||||||
|
mock_amphora_repo_get,
|
||||||
|
mock_driver,
|
||||||
|
mock_generate_uuid,
|
||||||
|
mock_log,
|
||||||
|
mock_get_session,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_listener_repo_update,
|
||||||
|
mock_amphora_repo_update):
|
||||||
|
amphora1_mock = mock.MagicMock()
|
||||||
|
amphora1_mock.id = 'id1'
|
||||||
|
amphora2_mock = mock.MagicMock()
|
||||||
|
amphora2_mock.id = 'id2'
|
||||||
|
db_amphora1_mock = mock.Mock()
|
||||||
|
db_amphora2_mock = mock.Mock()
|
||||||
|
|
||||||
|
amp_get_connectivity_status = (
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus())
|
||||||
|
|
||||||
|
# All amphorae reachable
|
||||||
|
mock_amphora_repo_get.side_effect = [
|
||||||
|
db_amphora1_mock,
|
||||||
|
db_amphora2_mock]
|
||||||
|
mock_driver.check.return_value = None
|
||||||
|
|
||||||
|
ret = amp_get_connectivity_status.execute(
|
||||||
|
[amphora1_mock, amphora2_mock],
|
||||||
|
amphora1_mock.id,
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.check.assert_has_calls(
|
||||||
|
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
|
||||||
|
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
|
||||||
|
self.assertFalse(
|
||||||
|
ret[amphora1_mock.id][constants.UNREACHABLE])
|
||||||
|
self.assertFalse(
|
||||||
|
ret[amphora2_mock.id][constants.UNREACHABLE])
|
||||||
|
|
||||||
|
# amphora1 unreachable
|
||||||
|
mock_driver.check.reset_mock()
|
||||||
|
mock_amphora_repo_get.side_effect = [
|
||||||
|
db_amphora1_mock,
|
||||||
|
db_amphora2_mock]
|
||||||
|
mock_driver.check.side_effect = [
|
||||||
|
driver_except.TimeOutException, None]
|
||||||
|
self.assertRaises(driver_except.TimeOutException,
|
||||||
|
amp_get_connectivity_status.execute,
|
||||||
|
[amphora1_mock, amphora2_mock],
|
||||||
|
amphora1_mock.id,
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.check.assert_called_with(
|
||||||
|
db_amphora1_mock, timeout_dict=self.timeout_dict)
|
||||||
|
|
||||||
|
# amphora2 unreachable
|
||||||
|
mock_driver.check.reset_mock()
|
||||||
|
mock_amphora_repo_get.side_effect = [
|
||||||
|
db_amphora1_mock,
|
||||||
|
db_amphora2_mock]
|
||||||
|
mock_driver.check.side_effect = [
|
||||||
|
None, driver_except.TimeOutException]
|
||||||
|
ret = amp_get_connectivity_status.execute(
|
||||||
|
[amphora1_mock, amphora2_mock],
|
||||||
|
amphora1_mock.id,
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.check.assert_has_calls(
|
||||||
|
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
|
||||||
|
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
|
||||||
|
self.assertFalse(
|
||||||
|
ret[amphora1_mock.id][constants.UNREACHABLE])
|
||||||
|
self.assertTrue(
|
||||||
|
ret[amphora2_mock.id][constants.UNREACHABLE])
|
||||||
|
@ -286,6 +286,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
||||||
@ -296,7 +297,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(7, len(amp_flow.requires))
|
self.assertEqual(7, len(amp_flow.requires))
|
||||||
self.assertEqual(13, len(amp_flow.provides))
|
self.assertEqual(14, len(amp_flow.provides))
|
||||||
|
|
||||||
def test_get_failover_flow_standalone(self, mock_get_net_driver):
|
def test_get_failover_flow_standalone(self, mock_get_net_driver):
|
||||||
failed_amphora = data_models.Amphora(
|
failed_amphora = data_models.Amphora(
|
||||||
@ -320,6 +321,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
self.assertIn(constants.AMPHORA, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
self.assertIn(constants.BASE_PORT, amp_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
|
||||||
@ -330,7 +332,7 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(7, len(amp_flow.requires))
|
self.assertEqual(7, len(amp_flow.requires))
|
||||||
self.assertEqual(12, len(amp_flow.provides))
|
self.assertEqual(13, len(amp_flow.provides))
|
||||||
|
|
||||||
def test_get_failover_flow_bogus_role(self, mock_get_net_driver):
|
def test_get_failover_flow_bogus_role(self, mock_get_net_driver):
|
||||||
failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(),
|
failed_amphora = data_models.Amphora(id=uuidutils.generate_uuid(),
|
||||||
@ -368,12 +370,30 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
||||||
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
|
||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
||||||
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
|
||||||
|
|
||||||
|
self.assertEqual(3, len(vrrp_subflow.provides))
|
||||||
|
self.assertEqual(3, len(vrrp_subflow.requires))
|
||||||
|
|
||||||
|
def test_get_vrrp_subflow_dont_get_status(self, mock_get_net_driver):
|
||||||
|
vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123',
|
||||||
|
get_amphorae_status=False)
|
||||||
|
|
||||||
|
self.assertIsInstance(vrrp_subflow, flow.Flow)
|
||||||
|
|
||||||
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
||||||
|
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
||||||
|
|
||||||
|
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
|
||||||
|
|
||||||
self.assertEqual(2, len(vrrp_subflow.provides))
|
self.assertEqual(2, len(vrrp_subflow.provides))
|
||||||
self.assertEqual(2, len(vrrp_subflow.requires))
|
self.assertEqual(3, len(vrrp_subflow.requires))
|
||||||
|
|
||||||
def test_get_vrrp_subflow_dont_create_vrrp_group(
|
def test_get_vrrp_subflow_dont_create_vrrp_group(
|
||||||
self, mock_get_net_driver):
|
self, mock_get_net_driver):
|
||||||
@ -384,12 +404,14 @@ class TestAmphoraFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, vrrp_subflow.provides)
|
||||||
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
self.assertIn(constants.AMP_VRRP_INT, vrrp_subflow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.provides)
|
||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
|
||||||
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
|
||||||
|
|
||||||
self.assertEqual(2, len(vrrp_subflow.provides))
|
self.assertEqual(3, len(vrrp_subflow.provides))
|
||||||
self.assertEqual(2, len(vrrp_subflow.requires))
|
self.assertEqual(3, len(vrrp_subflow.requires))
|
||||||
|
|
||||||
def test_update_amphora_config_flow(self, mock_get_net_driver):
|
def test_update_amphora_config_flow(self, mock_get_net_driver):
|
||||||
|
|
||||||
|
@ -174,14 +174,16 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
||||||
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
|
||||||
|
|
||||||
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
|
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(2, len(amp_flow.requires), amp_flow.requires)
|
self.assertEqual(3, len(amp_flow.requires), amp_flow.requires)
|
||||||
self.assertEqual(4, len(amp_flow.provides), amp_flow.provides)
|
self.assertEqual(5, len(amp_flow.provides), amp_flow.provides)
|
||||||
|
|
||||||
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
|
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
|
||||||
'123', constants.TOPOLOGY_ACTIVE_STANDBY)
|
'123', constants.TOPOLOGY_ACTIVE_STANDBY)
|
||||||
@ -190,14 +192,16 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
|
||||||
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
|
||||||
|
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
|
||||||
|
|
||||||
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE, amp_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, amp_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, amp_flow.provides)
|
||||||
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
|
self.assertIn(constants.AMP_VRRP_INT, amp_flow.provides)
|
||||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(2, len(amp_flow.requires), amp_flow.requires)
|
self.assertEqual(3, len(amp_flow.requires), amp_flow.requires)
|
||||||
self.assertEqual(4, len(amp_flow.provides), amp_flow.provides)
|
self.assertEqual(5, len(amp_flow.provides), amp_flow.provides)
|
||||||
|
|
||||||
def test_get_create_load_balancer_flows_single_listeners(
|
def test_get_create_load_balancer_flows_single_listeners(
|
||||||
self, mock_get_net_driver):
|
self, mock_get_net_driver):
|
||||||
@ -255,6 +259,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
self.assertIn(constants.AMPHORA_ID, create_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, create_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_NETWORK_CONFIG, create_flow.provides)
|
self.assertIn(constants.AMPHORA_NETWORK_CONFIG, create_flow.provides)
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides)
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, create_flow.provides)
|
||||||
|
self.assertIn(constants.AMPHORAE_STATUS, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, create_flow.provides)
|
self.assertIn(constants.COMPUTE_ID, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
|
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
|
||||||
self.assertIn(constants.DELTAS, create_flow.provides)
|
self.assertIn(constants.DELTAS, create_flow.provides)
|
||||||
@ -265,7 +270,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
self.assertIn(constants.VIP, create_flow.provides)
|
self.assertIn(constants.VIP, create_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(6, len(create_flow.requires), create_flow.requires)
|
self.assertEqual(6, len(create_flow.requires), create_flow.requires)
|
||||||
self.assertEqual(16, len(create_flow.provides),
|
self.assertEqual(17, len(create_flow.provides),
|
||||||
create_flow.provides)
|
create_flow.provides)
|
||||||
|
|
||||||
def _test_get_failover_LB_flow_single(self, amphorae):
|
def _test_get_failover_LB_flow_single(self, amphorae):
|
||||||
|
@ -132,17 +132,35 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
|
|
||||||
mock_amphora_repo_get.return_value = _db_amphora_mock
|
mock_amphora_repo_get.return_value = _db_amphora_mock
|
||||||
mock_lb_get.return_value = _db_load_balancer_mock
|
mock_lb_get.return_value = _db_load_balancer_mock
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
|
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
|
||||||
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
|
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
|
||||||
self.timeout_dict)
|
amphorae_status, self.timeout_dict)
|
||||||
|
|
||||||
mock_driver.update_amphora_listeners.assert_called_once_with(
|
mock_driver.update_amphora_listeners.assert_called_once_with(
|
||||||
_db_load_balancer_mock, _db_amphora_mock, self.timeout_dict)
|
_db_load_balancer_mock, _db_amphora_mock, self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
mock_driver.reset_mock()
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
|
||||||
|
amphorae_status, self.timeout_dict)
|
||||||
|
mock_driver.update_amphora_listeners.assert_not_called()
|
||||||
|
|
||||||
|
# Test exception
|
||||||
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
|
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
|
||||||
|
|
||||||
amp_list_update_obj.execute(_LB_mock, 0,
|
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {},
|
||||||
[_amphora_mock], self.timeout_dict)
|
self.timeout_dict)
|
||||||
|
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, AMP_ID, status=constants.ERROR)
|
_session_mock, AMP_ID, status=constants.ERROR)
|
||||||
@ -194,37 +212,54 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
mock_driver, mock_generate_uuid, mock_log, mock_get_session,
|
mock_driver, mock_generate_uuid, mock_log, mock_get_session,
|
||||||
mock_listener_repo_get, mock_listener_repo_update,
|
mock_listener_repo_get, mock_listener_repo_update,
|
||||||
mock_amphora_repo_get, mock_amphora_repo_update):
|
mock_amphora_repo_get, mock_amphora_repo_update):
|
||||||
amphora_mock = mock.MagicMock()
|
|
||||||
listeners_reload_obj = (
|
listeners_reload_obj = (
|
||||||
amphora_driver_tasks.AmphoraIndexListenersReload())
|
amphora_driver_tasks.AmphoraIndexListenersReload())
|
||||||
mock_lb = mock.MagicMock()
|
mock_lb = mock.MagicMock()
|
||||||
mock_listener = mock.MagicMock()
|
mock_listener = mock.MagicMock()
|
||||||
mock_listener.id = '12345'
|
mock_listener.id = '12345'
|
||||||
mock_amphora_repo_get.return_value = amphora_mock
|
mock_amphora_repo_get.return_value = _amphora_mock
|
||||||
mock_lb_repo_get.return_value = mock_lb
|
mock_lb_repo_get.return_value = mock_lb
|
||||||
mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')]
|
mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')]
|
||||||
|
|
||||||
# Test no listeners
|
# Test no listeners
|
||||||
mock_lb.listeners = None
|
mock_lb.listeners = None
|
||||||
listeners_reload_obj.execute(mock_lb, 0, None)
|
listeners_reload_obj.execute(mock_lb, 0, None, {})
|
||||||
mock_driver.reload.assert_not_called()
|
mock_driver.reload.assert_not_called()
|
||||||
|
|
||||||
# Test with listeners
|
# Test with listeners
|
||||||
mock_driver.start.reset_mock()
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mock_driver.reload.reset_mock()
|
||||||
mock_lb.listeners = [mock_listener]
|
mock_lb.listeners = [mock_listener]
|
||||||
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
|
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
|
||||||
|
amphorae_status,
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
|
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
|
||||||
self.timeout_dict)
|
self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mock_driver.reload.reset_mock()
|
||||||
|
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
|
||||||
|
amphorae_status,
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.reload.assert_not_called()
|
||||||
|
|
||||||
# Test with reload exception
|
# Test with reload exception
|
||||||
mock_driver.reload.reset_mock()
|
mock_driver.reload.reset_mock()
|
||||||
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
|
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
|
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
|
||||||
self.timeout_dict)
|
self.timeout_dict)
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, amphora_mock[constants.ID],
|
_session_mock, _amphora_mock[constants.ID],
|
||||||
status=constants.ERROR)
|
status=constants.ERROR)
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||||
@ -728,6 +763,11 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
FAKE_INTERFACE = 'fake0'
|
FAKE_INTERFACE = 'fake0'
|
||||||
mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE,
|
mock_driver.get_interface_from_ip.side_effect = [FAKE_INTERFACE,
|
||||||
Exception('boom')]
|
Exception('boom')]
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
|
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
|
||||||
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
|
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
|
||||||
@ -735,17 +775,28 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
amphora_update_vrrp_interface_obj = (
|
amphora_update_vrrp_interface_obj = (
|
||||||
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
|
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
|
||||||
amphora_update_vrrp_interface_obj.execute(
|
amphora_update_vrrp_interface_obj.execute(
|
||||||
0, [_amphora_mock], timeout_dict)
|
0, [_amphora_mock], amphorae_status, timeout_dict)
|
||||||
mock_driver.get_interface_from_ip.assert_called_once_with(
|
mock_driver.get_interface_from_ip.assert_called_once_with(
|
||||||
_db_amphora_mock, _db_amphora_mock.vrrp_ip,
|
_db_amphora_mock, _db_amphora_mock.vrrp_ip,
|
||||||
timeout_dict=timeout_dict)
|
timeout_dict=timeout_dict)
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, _db_amphora_mock.id, vrrp_interface=FAKE_INTERFACE)
|
_session_mock, _db_amphora_mock.id, vrrp_interface=FAKE_INTERFACE)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
mock_driver.reset_mock()
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amphora_update_vrrp_interface_obj.execute(
|
||||||
|
0, [_amphora_mock], amphorae_status, timeout_dict)
|
||||||
|
mock_driver.get_interface_from_ip.assert_not_called()
|
||||||
|
|
||||||
# Test with an exception
|
# Test with an exception
|
||||||
mock_amphora_repo_update.reset_mock()
|
mock_amphora_repo_update.reset_mock()
|
||||||
amphora_update_vrrp_interface_obj.execute(
|
amphora_update_vrrp_interface_obj.execute(
|
||||||
0, [_amphora_mock], timeout_dict)
|
0, [_amphora_mock], {}, timeout_dict)
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
|
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
|
||||||
|
|
||||||
@ -796,20 +847,40 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
Exception('boom')]
|
Exception('boom')]
|
||||||
mock_lb_get.return_value = _db_load_balancer_mock
|
mock_lb_get.return_value = _db_load_balancer_mock
|
||||||
mock_amphora_repo_get.return_value = _db_amphora_mock
|
mock_amphora_repo_get.return_value = _db_amphora_mock
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
amphora_vrrp_update_obj = (
|
amphora_vrrp_update_obj = (
|
||||||
amphora_driver_tasks.AmphoraIndexVRRPUpdate())
|
amphora_driver_tasks.AmphoraIndexVRRPUpdate())
|
||||||
|
|
||||||
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
|
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
|
||||||
0, [_amphora_mock], 'fakeint0',
|
0, [_amphora_mock], amphorae_status,
|
||||||
|
'fakeint0',
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.update_vrrp_conf.assert_called_once_with(
|
mock_driver.update_vrrp_conf.assert_called_once_with(
|
||||||
_db_load_balancer_mock, amphorae_network_config, _db_amphora_mock,
|
_db_load_balancer_mock, amphorae_network_config, _db_amphora_mock,
|
||||||
self.timeout_dict)
|
self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mock_amphora_repo_update.reset_mock()
|
||||||
|
mock_driver.update_vrrp_conf.reset_mock()
|
||||||
|
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
|
||||||
|
0, [_amphora_mock], amphorae_status,
|
||||||
|
None)
|
||||||
|
mock_driver.update_vrrp_conf.assert_not_called()
|
||||||
|
|
||||||
# Test with an exception
|
# Test with an exception
|
||||||
mock_amphora_repo_update.reset_mock()
|
mock_amphora_repo_update.reset_mock()
|
||||||
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
|
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
|
||||||
0, [_amphora_mock], 'fakeint0')
|
0, [_amphora_mock], {}, 'fakeint0')
|
||||||
mock_amphora_repo_update.assert_called_once_with(
|
mock_amphora_repo_update.assert_called_once_with(
|
||||||
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
|
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
|
||||||
|
|
||||||
@ -840,19 +911,36 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
mock_amphora_repo_get,
|
mock_amphora_repo_get,
|
||||||
mock_amphora_repo_update):
|
mock_amphora_repo_update):
|
||||||
mock_amphora_repo_get.return_value = _db_amphora_mock
|
mock_amphora_repo_get.return_value = _db_amphora_mock
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
amphora_vrrp_start_obj = (
|
amphora_vrrp_start_obj = (
|
||||||
amphora_driver_tasks.AmphoraIndexVRRPStart())
|
amphora_driver_tasks.AmphoraIndexVRRPStart())
|
||||||
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
|
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
|
||||||
Exception('boom')]
|
Exception('boom')]
|
||||||
|
|
||||||
amphora_vrrp_start_obj.execute(0, [_amphora_mock],
|
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.start_vrrp_service.assert_called_once_with(
|
mock_driver.start_vrrp_service.assert_called_once_with(
|
||||||
_db_amphora_mock, self.timeout_dict)
|
_db_amphora_mock, self.timeout_dict)
|
||||||
|
|
||||||
|
# Unreachable amp
|
||||||
|
mock_driver.start_vrrp_service.reset_mock()
|
||||||
|
amphorae_status = {
|
||||||
|
_amphora_mock[constants.ID]: {
|
||||||
|
constants.UNREACHABLE: True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.start_vrrp_service.assert_not_called()
|
||||||
|
|
||||||
# Test with a start exception
|
# Test with a start exception
|
||||||
mock_driver.start_vrrp_service.reset_mock()
|
mock_driver.start_vrrp_service.reset_mock()
|
||||||
amphora_vrrp_start_obj.execute(0, [_amphora_mock],
|
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
|
||||||
timeout_dict=self.timeout_dict)
|
timeout_dict=self.timeout_dict)
|
||||||
mock_driver.start_vrrp_service.assert_called_once_with(
|
mock_driver.start_vrrp_service.assert_called_once_with(
|
||||||
_db_amphora_mock, self.timeout_dict)
|
_db_amphora_mock, self.timeout_dict)
|
||||||
@ -930,3 +1018,74 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||||||
self.assertRaises(driver_except.TimeOutException,
|
self.assertRaises(driver_except.TimeOutException,
|
||||||
amp_config_update_obj.execute,
|
amp_config_update_obj.execute,
|
||||||
_amphora_mock, flavor)
|
_amphora_mock, flavor)
|
||||||
|
|
||||||
|
def test_amphorae_get_connectivity_status(self,
|
||||||
|
mock_driver,
|
||||||
|
mock_generate_uuid,
|
||||||
|
mock_log,
|
||||||
|
mock_get_session,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_listener_repo_update,
|
||||||
|
mock_amphora_repo_get,
|
||||||
|
mock_amphora_repo_update):
|
||||||
|
amphora1_mock = mock.MagicMock()
|
||||||
|
amphora1_mock[constants.ID] = 'id1'
|
||||||
|
amphora2_mock = mock.MagicMock()
|
||||||
|
amphora2_mock[constants.ID] = 'id2'
|
||||||
|
db_amphora1_mock = mock.Mock()
|
||||||
|
db_amphora2_mock = mock.Mock()
|
||||||
|
|
||||||
|
amp_get_connectivity_status = (
|
||||||
|
amphora_driver_tasks.AmphoraeGetConnectivityStatus())
|
||||||
|
|
||||||
|
# All amphorae reachable
|
||||||
|
mock_amphora_repo_get.side_effect = [
|
||||||
|
db_amphora1_mock,
|
||||||
|
db_amphora2_mock]
|
||||||
|
mock_driver.check.return_value = None
|
||||||
|
|
||||||
|
ret = amp_get_connectivity_status.execute(
|
||||||
|
[amphora1_mock, amphora2_mock],
|
||||||
|
amphora1_mock[constants.ID],
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.check.assert_has_calls(
|
||||||
|
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
|
||||||
|
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
|
||||||
|
self.assertFalse(
|
||||||
|
ret[amphora1_mock[constants.ID]][constants.UNREACHABLE])
|
||||||
|
self.assertFalse(
|
||||||
|
ret[amphora2_mock[constants.ID]][constants.UNREACHABLE])
|
||||||
|
|
||||||
|
# amphora1 unreachable
|
||||||
|
mock_driver.check.reset_mock()
|
||||||
|
mock_amphora_repo_get.side_effect = [
|
||||||
|
db_amphora1_mock,
|
||||||
|
db_amphora2_mock]
|
||||||
|
mock_driver.check.side_effect = [
|
||||||
|
driver_except.TimeOutException, None]
|
||||||
|
self.assertRaises(driver_except.TimeOutException,
|
||||||
|
amp_get_connectivity_status.execute,
|
||||||
|
[amphora1_mock, amphora2_mock],
|
||||||
|
amphora1_mock[constants.ID],
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.check.assert_called_with(
|
||||||
|
db_amphora1_mock, timeout_dict=self.timeout_dict)
|
||||||
|
|
||||||
|
# amphora2 unreachable
|
||||||
|
mock_driver.check.reset_mock()
|
||||||
|
mock_amphora_repo_get.side_effect = [
|
||||||
|
db_amphora1_mock,
|
||||||
|
db_amphora2_mock]
|
||||||
|
mock_driver.check.side_effect = [
|
||||||
|
None, driver_except.TimeOutException]
|
||||||
|
ret = amp_get_connectivity_status.execute(
|
||||||
|
[amphora1_mock, amphora2_mock],
|
||||||
|
amphora1_mock[constants.ID],
|
||||||
|
timeout_dict=self.timeout_dict)
|
||||||
|
mock_driver.check.assert_has_calls(
|
||||||
|
[mock.call(db_amphora1_mock, timeout_dict=self.timeout_dict),
|
||||||
|
mock.call(db_amphora2_mock, timeout_dict=self.timeout_dict)])
|
||||||
|
self.assertFalse(
|
||||||
|
ret[amphora1_mock[constants.ID]][constants.UNREACHABLE])
|
||||||
|
self.assertTrue(
|
||||||
|
ret[amphora2_mock[constants.ID]][constants.UNREACHABLE])
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Reduce the duration of the failovers of ACTIVE_STANDBY load balancers. Many
|
||||||
|
updates of an unreachable amphora may have been attempted during a
|
||||||
|
failover, now if an amphora is not reachable at the first update, the other
|
||||||
|
updates are skipped.
|
Loading…
Reference in New Issue
Block a user