Fix failover when multiple amphora have failed
If a load balancer loses more than one amphora at the same time the failover process will fail and leave the load balancer in provisioning status ERROR. This patch resolves this by failing over one amphora at a time marking any amphora that are also failed in status ERROR. The health manager will then failover the other failed amphora in subsequent checks. This patch will update multiple healthy amphora in parallel and will timeout failed amphroa using the new "active_connection_max_retries" configuration setting used for "fail-fast" connections. The patch also updates the amphora failover flow documentation to show the full flow and not just the spares failover flow. It updates the amphora driver "get_diagnostics" method to pass instead of error. It also adds a AmphoraComputeConnectivityWait task to explicitly wait for a compute instance to come up and be reachable. This allows a longer timeout and clarifies this may fail due to compute (nova) failures. Previously the first plug vip task would do this wait. Change-Id: Ief97ddda8261b5bbc54c6824f90ae9c7a2d81701 Story: 2001481 Task: 6202
This commit is contained in:
parent
9ca61f2f4a
commit
0139f12c2e
@ -198,6 +198,12 @@
|
||||
#
|
||||
# rest_request_conn_timeout = 10
|
||||
# rest_request_read_timeout = 60
|
||||
#
|
||||
# These "active" timeouts are used once the amphora should already
|
||||
# be fully up and active. These values are lower than the other values to
|
||||
# facilitate "fail fast" scenarios like failovers
|
||||
# active_connection_max_retries = 15
|
||||
# active_connection_rety_interval = 2
|
||||
|
||||
[controller_worker]
|
||||
# workers = 1
|
||||
|
@ -21,6 +21,25 @@ import six
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AmphoraLoadBalancerDriver(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
|
||||
"""Update the amphora with a new configuration.
|
||||
|
||||
:param listeners: List of listeners to update.
|
||||
:type listener: list
|
||||
:param amphora_id: The ID of the amphora to update
|
||||
:type amphora_id: string
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
:returns: None
|
||||
|
||||
Builds a new configuration, pushes it to the amphora, and reloads
|
||||
the listener on one amphora.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def update(self, listener, vip):
|
||||
"""Update the amphora with a new configuration.
|
||||
@ -30,7 +49,7 @@ class AmphoraLoadBalancerDriver(object):
|
||||
:type listener: object
|
||||
:param vip: vip object, need to use its ip_address property
|
||||
:type vip: object
|
||||
:returns: return a value list (listener, vip, status flag--update)
|
||||
:returns: None
|
||||
|
||||
At this moment, we just build the basic structure for testing, will
|
||||
add more function along with the development.
|
||||
|
@ -60,6 +60,45 @@ class HaproxyAmphoraLoadBalancerDriver(
|
||||
haproxy_template=CONF.haproxy_amphora.haproxy_template,
|
||||
connection_logging=CONF.haproxy_amphora.connection_logging)
|
||||
|
||||
def update_amphora_listeners(self, listeners, amphora_index,
|
||||
amphorae, timeout_dict=None):
|
||||
"""Update the amphora with a new configuration.
|
||||
|
||||
:param listeners: List of listeners to update.
|
||||
:type listener: list
|
||||
:param amphora_id: The ID of the amphora to update
|
||||
:type amphora_id: string
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
:returns: None
|
||||
|
||||
Updates the configuration of the listeners on a single amphora.
|
||||
"""
|
||||
# if the amphora does not yet have listeners, no need to update them.
|
||||
if not listeners:
|
||||
LOG.debug('No listeners found to update.')
|
||||
return
|
||||
amp = amphorae[amphora_index]
|
||||
if amp is None or amp.status == consts.DELETED:
|
||||
return
|
||||
# TODO(johnsom) remove when we don't have a process per listener
|
||||
for listener in listeners:
|
||||
LOG.debug("%s updating listener %s on amphora %s",
|
||||
self.__class__.__name__, listener.id, amp.id)
|
||||
certs = self._process_tls_certificates(listener)
|
||||
# Generate HaProxy configuration from listener object
|
||||
config = self.jinja.build_config(
|
||||
host_amphora=amp,
|
||||
listener=listener,
|
||||
tls_cert=certs['tls_cert'],
|
||||
user_group=CONF.haproxy_amphora.user_group)
|
||||
self.client.upload_config(amp, listener.id, config,
|
||||
timeout_dict=timeout_dict)
|
||||
self.client.reload_listener(amp, listener.id,
|
||||
timeout_dict=timeout_dict)
|
||||
|
||||
def update(self, listener, vip):
|
||||
LOG.debug("Amphora %s haproxy, updating listener %s, vip %s",
|
||||
self.__class__.__name__, listener.protocol_port,
|
||||
@ -85,25 +124,29 @@ class HaproxyAmphoraLoadBalancerDriver(
|
||||
self.__class__.__name__, amp.id)
|
||||
self.client.update_cert_for_rotation(amp, pem)
|
||||
|
||||
def _apply(self, func, listener=None, *args):
|
||||
for amp in listener.load_balancer.amphorae:
|
||||
if amp.status != consts.DELETED:
|
||||
func(amp, listener.id, *args)
|
||||
def _apply(self, func, listener=None, amphora=None, *args):
|
||||
if amphora is None:
|
||||
for amp in listener.load_balancer.amphorae:
|
||||
if amp.status != consts.DELETED:
|
||||
func(amp, listener.id, *args)
|
||||
else:
|
||||
if amphora.status != consts.DELETED:
|
||||
func(amphora, listener.id, *args)
|
||||
|
||||
def stop(self, listener, vip):
|
||||
self._apply(self.client.stop_listener, listener)
|
||||
|
||||
def start(self, listener, vip):
|
||||
self._apply(self.client.start_listener, listener)
|
||||
def start(self, listener, vip, amphora=None):
|
||||
self._apply(self.client.start_listener, listener, amphora)
|
||||
|
||||
def delete(self, listener, vip):
|
||||
self._apply(self.client.delete_listener, listener)
|
||||
|
||||
def get_info(self, amphora):
|
||||
self.driver.get_info(amphora.lb_network_ip)
|
||||
return self.client.get_info(amphora)
|
||||
|
||||
def get_diagnostics(self, amphora):
|
||||
self.driver.get_diagnostics(amphora.lb_network_ip)
|
||||
pass
|
||||
|
||||
def finalize_amphora(self, amphora):
|
||||
pass
|
||||
@ -186,7 +229,7 @@ class HaproxyAmphoraLoadBalancerDriver(
|
||||
pem = cert_parser.build_pem(cert)
|
||||
md5 = hashlib.md5(pem).hexdigest() # nosec
|
||||
name = '{id}.pem'.format(id=cert.id)
|
||||
self._apply(self._upload_cert, listener, pem, md5, name)
|
||||
self._apply(self._upload_cert, listener, None, pem, md5, name)
|
||||
|
||||
return {'tls_cert': tls_cert, 'sni_certs': sni_certs}
|
||||
|
||||
@ -251,17 +294,27 @@ class AmphoraAPIClient(object):
|
||||
port=CONF.haproxy_amphora.bind_port,
|
||||
version=API_VERSION)
|
||||
|
||||
def request(self, method, amp, path='/', **kwargs):
|
||||
def request(self, method, amp, path='/', timeout_dict=None, **kwargs):
|
||||
cfg_ha_amp = CONF.haproxy_amphora
|
||||
if timeout_dict is None:
|
||||
timeout_dict = {}
|
||||
req_conn_timeout = timeout_dict.get(
|
||||
consts.REQ_CONN_TIMEOUT, cfg_ha_amp.rest_request_conn_timeout)
|
||||
req_read_timeout = timeout_dict.get(
|
||||
consts.REQ_READ_TIMEOUT, cfg_ha_amp.rest_request_read_timeout)
|
||||
conn_max_retries = timeout_dict.get(
|
||||
consts.CONN_MAX_RETRIES, cfg_ha_amp.connection_max_retries)
|
||||
conn_retry_interval = timeout_dict.get(
|
||||
consts.CONN_RETRY_INTERVAL, cfg_ha_amp.connection_retry_interval)
|
||||
|
||||
LOG.debug("request url %s", path)
|
||||
_request = getattr(self.session, method.lower())
|
||||
_url = self._base_url(amp.lb_network_ip) + path
|
||||
LOG.debug("request url %s", _url)
|
||||
timeout_tuple = (CONF.haproxy_amphora.rest_request_conn_timeout,
|
||||
CONF.haproxy_amphora.rest_request_read_timeout)
|
||||
reqargs = {
|
||||
'verify': CONF.haproxy_amphora.server_ca,
|
||||
'url': _url,
|
||||
'timeout': timeout_tuple, }
|
||||
'timeout': (req_conn_timeout, req_read_timeout), }
|
||||
reqargs.update(kwargs)
|
||||
headers = reqargs.setdefault('headers', {})
|
||||
|
||||
@ -269,7 +322,7 @@ class AmphoraAPIClient(object):
|
||||
self.ssl_adapter.uuid = amp.id
|
||||
exception = None
|
||||
# Keep retrying
|
||||
for a in six.moves.xrange(CONF.haproxy_amphora.connection_max_retries):
|
||||
for a in six.moves.xrange(conn_max_retries):
|
||||
try:
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
@ -303,20 +356,20 @@ class AmphoraAPIClient(object):
|
||||
except (requests.ConnectionError, requests.Timeout) as e:
|
||||
exception = e
|
||||
LOG.warning("Could not connect to instance. Retrying.")
|
||||
time.sleep(CONF.haproxy_amphora.connection_retry_interval)
|
||||
time.sleep(conn_retry_interval)
|
||||
|
||||
LOG.error("Connection retries (currently set to %(max_retries)s) "
|
||||
"exhausted. The amphora is unavailable. Reason: "
|
||||
"%(exception)s",
|
||||
{'max_retries': CONF.haproxy_amphora.connection_max_retries,
|
||||
{'max_retries': conn_max_retries,
|
||||
'exception': exception})
|
||||
raise driver_except.TimeOutException()
|
||||
|
||||
def upload_config(self, amp, listener_id, config):
|
||||
def upload_config(self, amp, listener_id, config, timeout_dict=None):
|
||||
r = self.put(
|
||||
amp,
|
||||
'listeners/{amphora_id}/{listener_id}/haproxy'.format(
|
||||
amphora_id=amp.id, listener_id=listener_id),
|
||||
amphora_id=amp.id, listener_id=listener_id), timeout_dict,
|
||||
data=config)
|
||||
return exc.check_exception(r)
|
||||
|
||||
@ -328,9 +381,9 @@ class AmphoraAPIClient(object):
|
||||
return r.json()
|
||||
return None
|
||||
|
||||
def _action(self, action, amp, listener_id):
|
||||
def _action(self, action, amp, listener_id, timeout_dict=None):
|
||||
r = self.put(amp, 'listeners/{listener_id}/{action}'.format(
|
||||
listener_id=listener_id, action=action))
|
||||
listener_id=listener_id, action=action), timeout_dict=timeout_dict)
|
||||
return exc.check_exception(r)
|
||||
|
||||
def upload_cert_pem(self, amp, listener_id, pem_filename, pem_file):
|
||||
@ -403,8 +456,9 @@ class AmphoraAPIClient(object):
|
||||
r = self.put(amp, 'vrrp/{action}'.format(action=action))
|
||||
return exc.check_exception(r)
|
||||
|
||||
def get_interface(self, amp, ip_addr):
|
||||
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr))
|
||||
def get_interface(self, amp, ip_addr, timeout_dict=None):
|
||||
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
|
||||
timeout_dict=timeout_dict)
|
||||
if exc.check_exception(r):
|
||||
return r.json()
|
||||
return None
|
||||
|
@ -90,5 +90,6 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
|
||||
|
||||
self.client.reload_vrrp(amp)
|
||||
|
||||
def get_vrrp_interface(self, amphora):
|
||||
return self.client.get_interface(amphora, amphora.vrrp_ip)['interface']
|
||||
def get_vrrp_interface(self, amphora, timeout_dict=None):
|
||||
return self.client.get_interface(
|
||||
amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface']
|
||||
|
@ -37,6 +37,14 @@ class NoopManager(object):
|
||||
super(NoopManager, self).__init__()
|
||||
self.amphoraconfig = {}
|
||||
|
||||
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
|
||||
for listener in listeners:
|
||||
LOG.debug("Amphora noop driver update_amphora_listeners, "
|
||||
"listener %s, amphora %s, timeouts %s", listener.id,
|
||||
amphora_id, timeout_dict)
|
||||
self.amphoraconfig[(listener.id, amphora_id)] = (
|
||||
listener, amphora_id, timeout_dict, "update_amp")
|
||||
|
||||
def update(self, listener, vip):
|
||||
LOG.debug("Amphora %s no-op, update listener %s, vip %s",
|
||||
self.__class__.__name__, listener.protocol_port,
|
||||
@ -106,6 +114,11 @@ class NoopAmphoraLoadBalancerDriver(
|
||||
super(NoopAmphoraLoadBalancerDriver, self).__init__()
|
||||
self.driver = NoopManager()
|
||||
|
||||
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
|
||||
|
||||
self.driver.update_amphora_listeners(listeners, amphora_id,
|
||||
timeout_dict)
|
||||
|
||||
def update(self, listener, vip):
|
||||
|
||||
self.driver.update(listener, vip)
|
||||
|
@ -240,6 +240,13 @@ haproxy_amphora_opts = [
|
||||
default=5,
|
||||
help=_('Retry timeout between connection attempts in '
|
||||
'seconds.')),
|
||||
cfg.IntOpt('active_connection_max_retries',
|
||||
default=15,
|
||||
help=_('Retry threshold for connecting to active amphorae.')),
|
||||
cfg.IntOpt('active_connection_rety_interval',
|
||||
default=2,
|
||||
help=_('Retry timeout between connection attempts in '
|
||||
'seconds for active amphora.')),
|
||||
cfg.IntOpt('build_rate_limit',
|
||||
default=-1,
|
||||
help=_('Number of amphorae that could be built per controller'
|
||||
|
@ -195,6 +195,7 @@ FAILED_AMPHORA = 'failed_amphora'
|
||||
FAILOVER_AMPHORA = 'failover_amphora'
|
||||
AMPHORAE = 'amphorae'
|
||||
AMPHORA_ID = 'amphora_id'
|
||||
AMPHORA_INDEX = 'amphora_index'
|
||||
FAILOVER_AMPHORA_ID = 'failover_amphora_id'
|
||||
DELTA = 'delta'
|
||||
DELTAS = 'deltas'
|
||||
@ -240,6 +241,11 @@ MEMBER_UPDATES = 'member_updates'
|
||||
HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
|
||||
L7POLICY_UPDATES = 'l7policy_updates'
|
||||
L7RULE_UPDATES = 'l7rule_updates'
|
||||
TIMEOUT_DICT = 'timeout_dict'
|
||||
REQ_CONN_TIMEOUT = 'req_conn_timeout'
|
||||
REQ_READ_TIMEOUT = 'req_read_timeout'
|
||||
CONN_MAX_RETRIES = 'conn_max_retries'
|
||||
CONN_RETRY_INTERVAL = 'conn_retry_interval'
|
||||
|
||||
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
|
||||
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
|
||||
@ -273,6 +279,7 @@ UPDATE_MEMBER_FLOW = 'octavia-update-member-flow'
|
||||
UPDATE_POOL_FLOW = 'octavia-update-pool-flow'
|
||||
UPDATE_L7POLICY_FLOW = 'octavia-update-l7policy-flow'
|
||||
UPDATE_L7RULE_FLOW = 'octavia-update-l7rule-flow'
|
||||
UPDATE_AMPS_SUBFLOW = 'octavia-update-amps-subflow'
|
||||
|
||||
POST_MAP_AMP_TO_LB_SUBFLOW = 'octavia-post-map-amp-to-lb-subflow'
|
||||
CREATE_AMP_FOR_LB_SUBFLOW = 'octavia-create-amp-for-lb-subflow'
|
||||
@ -306,6 +313,8 @@ AMP_VRRP_STOP = 'octavia-amphora-vrrp-stop'
|
||||
AMP_UPDATE_VRRP_INTF = 'octavia-amphora-update-vrrp-intf'
|
||||
CREATE_VRRP_GROUP_FOR_LB = 'octavia-create-vrrp-group-for-lb'
|
||||
CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules'
|
||||
AMP_COMPUTE_CONNECTIVITY_WAIT = 'octavia-amp-compute-connectivity-wait'
|
||||
AMP_LISTENER_UPDATE = 'octavia-amp-listeners-update'
|
||||
|
||||
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'
|
||||
|
||||
|
@ -747,15 +747,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
|
||||
# if we run with anti-affinity we need to set the server group
|
||||
# as well
|
||||
if CONF.nova.enable_anti_affinity:
|
||||
lb = self._amphora_repo.get_lb_for_amphora(
|
||||
db_apis.get_session(), amp.id)
|
||||
if lb:
|
||||
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
|
||||
lb = self._amphora_repo.get_lb_for_amphora(
|
||||
db_apis.get_session(), amp.id)
|
||||
if CONF.nova.enable_anti_affinity and lb:
|
||||
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
|
||||
|
||||
failover_amphora_tf = self._taskflow_load(
|
||||
self._amphora_flows.get_failover_flow(
|
||||
role=amp.role, load_balancer_id=amp.load_balancer_id),
|
||||
role=amp.role, load_balancer=lb),
|
||||
store=stored_params)
|
||||
|
||||
with tf_logging.DynamicLoggingListener(
|
||||
|
@ -16,6 +16,7 @@
|
||||
from oslo_config import cfg
|
||||
from taskflow.patterns import graph_flow
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow.patterns import unordered_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
@ -62,12 +63,15 @@ class AmphoraFlows(object):
|
||||
provides=constants.COMPUTE_ID))
|
||||
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
|
||||
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
|
||||
create_amphora_flow.add(compute_tasks.ComputeWait(
|
||||
create_amphora_flow.add(compute_tasks.ComputeActiveWait(
|
||||
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
|
||||
provides=constants.COMPUTE_OBJ))
|
||||
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
|
||||
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
|
||||
provides=constants.AMPHORA))
|
||||
create_amphora_flow.add(
|
||||
amphora_driver_tasks.AmphoraComputeConnectivityWait(
|
||||
requires=constants.AMPHORA))
|
||||
create_amphora_flow.add(database_tasks.ReloadAmphora(
|
||||
requires=constants.AMPHORA_ID,
|
||||
provides=constants.AMPHORA))
|
||||
@ -172,7 +176,7 @@ class AmphoraFlows(object):
|
||||
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBootingInDB(
|
||||
name=sf_name + '-' + constants.MARK_AMPHORA_BOOTING_INDB,
|
||||
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
|
||||
create_amp_for_lb_subflow.add(compute_tasks.ComputeWait(
|
||||
create_amp_for_lb_subflow.add(compute_tasks.ComputeActiveWait(
|
||||
name=sf_name + '-' + constants.COMPUTE_WAIT,
|
||||
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
|
||||
provides=constants.COMPUTE_OBJ))
|
||||
@ -180,6 +184,10 @@ class AmphoraFlows(object):
|
||||
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
|
||||
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
|
||||
provides=constants.AMPHORA))
|
||||
create_amp_for_lb_subflow.add(
|
||||
amphora_driver_tasks.AmphoraComputeConnectivityWait(
|
||||
name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT,
|
||||
requires=constants.AMPHORA))
|
||||
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
|
||||
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
|
||||
requires=constants.AMPHORA))
|
||||
@ -290,7 +298,7 @@ class AmphoraFlows(object):
|
||||
return delete_amphora_flow
|
||||
|
||||
def get_failover_flow(self, role=constants.ROLE_STANDALONE,
|
||||
load_balancer_id=None):
|
||||
load_balancer=None):
|
||||
"""Creates a flow to failover a stale amphora
|
||||
|
||||
:returns: The flow for amphora failover
|
||||
@ -334,7 +342,7 @@ class AmphoraFlows(object):
|
||||
requires=constants.AMPHORA))
|
||||
|
||||
# If this is an unallocated amp (spares pool), we're done
|
||||
if not load_balancer_id:
|
||||
if not load_balancer:
|
||||
failover_amphora_flow.add(
|
||||
database_tasks.DisableAmphoraHealthMonitoring(
|
||||
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
|
||||
@ -373,9 +381,38 @@ class AmphoraFlows(object):
|
||||
provides=constants.AMPHORAE_NETWORK_CONFIG))
|
||||
failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer(
|
||||
requires=constants.LOADBALANCER, provides=constants.LISTENERS))
|
||||
failover_amphora_flow.add(database_tasks.GetAmphoraeFromLoadbalancer(
|
||||
requires=constants.LOADBALANCER, provides=constants.AMPHORAE))
|
||||
|
||||
failover_amphora_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=(constants.LOADBALANCER, constants.LISTENERS)))
|
||||
# Listeners update needs to be run on all amphora to update
|
||||
# their peer configurations. So parallelize this with an
|
||||
# unordered subflow.
|
||||
update_amps_subflow = unordered_flow.Flow(
|
||||
constants.UPDATE_AMPS_SUBFLOW)
|
||||
|
||||
timeout_dict = {
|
||||
constants.CONN_MAX_RETRIES:
|
||||
CONF.haproxy_amphora.active_connection_max_retries,
|
||||
constants.CONN_RETRY_INTERVAL:
|
||||
CONF.haproxy_amphora.active_connection_rety_interval}
|
||||
|
||||
# Setup parallel flows for each amp. We don't know the new amp
|
||||
# details at flow creation time, so setup a subflow for each
|
||||
# amp on the LB, they let the task index into a list of amps
|
||||
# to find the amphora it should work on.
|
||||
amp_index = 0
|
||||
for amp in load_balancer.amphorae:
|
||||
if amp.status == constants.DELETED:
|
||||
continue
|
||||
update_amps_subflow.add(
|
||||
amphora_driver_tasks.AmpListenersUpdate(
|
||||
name=constants.AMP_LISTENER_UPDATE + '-' + str(amp_index),
|
||||
requires=(constants.LISTENERS, constants.AMPHORAE),
|
||||
inject={constants.AMPHORA_INDEX: amp_index,
|
||||
constants.TIMEOUT_DICT: timeout_dict}))
|
||||
amp_index += 1
|
||||
|
||||
failover_amphora_flow.add(update_amps_subflow)
|
||||
|
||||
# Plug the VIP ports into the new amphora
|
||||
failover_amphora_flow.add(network_tasks.PlugVIPPort(
|
||||
@ -385,13 +422,22 @@ class AmphoraFlows(object):
|
||||
constants.AMPHORAE_NETWORK_CONFIG)))
|
||||
|
||||
# Plug the member networks into the new amphora
|
||||
failover_amphora_flow.add(network_tasks.CalculateDelta(
|
||||
requires=constants.LOADBALANCER, provides=constants.DELTAS))
|
||||
failover_amphora_flow.add(network_tasks.HandleNetworkDeltas(
|
||||
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
||||
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
|
||||
requires=(constants.LOADBALANCER, constants.AMPHORA),
|
||||
provides=constants.DELTA))
|
||||
|
||||
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
|
||||
requires=(constants.AMPHORA, constants.DELTA),
|
||||
provides=constants.ADDED_PORTS))
|
||||
|
||||
failover_amphora_flow.add(amphora_driver_tasks.AmphoraePostNetworkPlug(
|
||||
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
|
||||
|
||||
failover_amphora_flow.add(database_tasks.ReloadLoadBalancer(
|
||||
name='octavia-failover-LB-reload-2',
|
||||
requires=constants.LOADBALANCER_ID,
|
||||
provides=constants.LOADBALANCER))
|
||||
|
||||
# Handle the amphora role and VRRP if necessary
|
||||
if role == constants.ROLE_MASTER:
|
||||
failover_amphora_flow.add(database_tasks.MarkAmphoraMasterInDB(
|
||||
@ -412,7 +458,8 @@ class AmphoraFlows(object):
|
||||
requires=constants.AMPHORA))
|
||||
|
||||
failover_amphora_flow.add(amphora_driver_tasks.ListenersStart(
|
||||
requires=(constants.LOADBALANCER, constants.LISTENERS)))
|
||||
requires=(constants.LOADBALANCER, constants.LISTENERS,
|
||||
constants.AMPHORA)))
|
||||
failover_amphora_flow.add(
|
||||
database_tasks.DisableAmphoraHealthMonitoring(
|
||||
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
|
||||
|
@ -20,6 +20,7 @@ from stevedore import driver as stevedore_driver
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
|
||||
from octavia.amphorae.driver_exceptions import exceptions as driver_except
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker import task_utils as task_utilities
|
||||
from octavia.db import api as db_apis
|
||||
@ -45,6 +46,25 @@ class BaseAmphoraTask(task.Task):
|
||||
self.task_utils = task_utilities.TaskUtils()
|
||||
|
||||
|
||||
class AmpListenersUpdate(BaseAmphoraTask):
|
||||
"""Task to update the listeners on one amphora."""
|
||||
|
||||
def execute(self, listeners, amphora_index, amphorae, timeout_dict=()):
|
||||
# Note, we don't want this to cause a revert as it may be used
|
||||
# in a failover flow with both amps failing. Skip it and let
|
||||
# health manager fix it.
|
||||
try:
|
||||
self.amphora_driver.update_amphora_listeners(
|
||||
listeners, amphora_index, amphorae, timeout_dict)
|
||||
except Exception as e:
|
||||
amphora_id = amphorae[amphora_index].id
|
||||
LOG.error('Failed to update listeners on amphora %s. Skipping '
|
||||
'this amphora as it is failing to update due to: %s',
|
||||
amphora_id, str(e))
|
||||
self.amphora_repo.update(db_apis.get_session(), amphora_id,
|
||||
status=constants.ERROR)
|
||||
|
||||
|
||||
class ListenersUpdate(BaseAmphoraTask):
|
||||
"""Task to update amphora with all specified listeners' configurations."""
|
||||
|
||||
@ -104,10 +124,10 @@ class ListenerStart(BaseAmphoraTask):
|
||||
class ListenersStart(BaseAmphoraTask):
|
||||
"""Task to start all listeners on the vip."""
|
||||
|
||||
def execute(self, loadbalancer, listeners):
|
||||
def execute(self, loadbalancer, listeners, amphora=None):
|
||||
"""Execute listener start routines for listeners on an amphora."""
|
||||
for listener in listeners:
|
||||
self.amphora_driver.start(listener, loadbalancer.vip)
|
||||
self.amphora_driver.start(listener, loadbalancer.vip, amphora)
|
||||
LOG.debug("Started the listeners on the vip")
|
||||
|
||||
def revert(self, listeners, *args, **kwargs):
|
||||
@ -261,11 +281,27 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
|
||||
def execute(self, loadbalancer):
|
||||
"""Execute post_vip_routine."""
|
||||
amps = []
|
||||
timeout_dict = {
|
||||
constants.CONN_MAX_RETRIES:
|
||||
CONF.haproxy_amphora.active_connection_max_retries,
|
||||
constants.CONN_RETRY_INTERVAL:
|
||||
CONF.haproxy_amphora.active_connection_rety_interval}
|
||||
for amp in six.moves.filter(
|
||||
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
|
||||
loadbalancer.amphorae):
|
||||
# Currently this is supported only with REST Driver
|
||||
interface = self.amphora_driver.get_vrrp_interface(amp)
|
||||
|
||||
try:
|
||||
interface = self.amphora_driver.get_vrrp_interface(
|
||||
amp, timeout_dict=timeout_dict)
|
||||
except Exception as e:
|
||||
# This can occur when an active/standby LB has no listener
|
||||
LOG.error('Failed to get amphora VRRP interface on amphora '
|
||||
'%s. Skipping this amphora as it is failing due to: '
|
||||
'%s', amp.id, str(e))
|
||||
self.amphora_repo.update(db_apis.get_session(), amp.id,
|
||||
status=constants.ERROR)
|
||||
continue
|
||||
|
||||
self.amphora_repo.update(db_apis.get_session(), amp.id,
|
||||
vrrp_interface=interface)
|
||||
amps.append(self.amphora_repo.get(db_apis.get_session(),
|
||||
@ -317,3 +353,22 @@ class AmphoraVRRPStart(BaseAmphoraTask):
|
||||
self.amphora_driver.start_vrrp_service(loadbalancer)
|
||||
LOG.debug("Started VRRP of loadbalancer %s amphorae",
|
||||
loadbalancer.id)
|
||||
|
||||
|
||||
class AmphoraComputeConnectivityWait(BaseAmphoraTask):
|
||||
""""Task to wait for the compute instance to be up."""
|
||||
|
||||
def execute(self, amphora):
|
||||
"""Execute get_info routine for an amphora until it responds."""
|
||||
try:
|
||||
amp_info = self.amphora_driver.get_info(amphora)
|
||||
LOG.debug('Successfuly connected to amphora %s: %s',
|
||||
amphora.id, amp_info)
|
||||
except driver_except.TimeOutException:
|
||||
LOG.error("Amphora compute instance failed to become reachable. "
|
||||
"This either means the compute driver failed to fully "
|
||||
"boot the instance inside the timeout interval or the "
|
||||
"instance is not reachable via the lb-mgmt-net.")
|
||||
self.amphora_repo.update(db_apis.get_session(), amphora.id,
|
||||
status=constants.ERROR)
|
||||
raise
|
||||
|
@ -175,7 +175,7 @@ class ComputeDelete(BaseComputeTask):
|
||||
raise
|
||||
|
||||
|
||||
class ComputeWait(BaseComputeTask):
|
||||
class ComputeActiveWait(BaseComputeTask):
|
||||
"""Wait for the compute driver to mark the amphora active."""
|
||||
|
||||
def execute(self, compute_id, amphora_id):
|
||||
|
@ -1525,6 +1525,25 @@ class GetAmphoraDetails(BaseDatabaseTask):
|
||||
vrrp_priority=amphora.vrrp_priority)
|
||||
|
||||
|
||||
class GetAmphoraeFromLoadbalancer(BaseDatabaseTask):
|
||||
"""Task to pull the listeners from a loadbalancer."""
|
||||
|
||||
def execute(self, loadbalancer):
|
||||
"""Pull the amphorae from a loadbalancer.
|
||||
|
||||
:param loadbalancer: Load balancer which listeners are required
|
||||
:returns: A list of Listener objects
|
||||
"""
|
||||
amphorae = []
|
||||
for amp in loadbalancer.amphorae:
|
||||
a = self.amphora_repo.get(db_apis.get_session(), id=amp.id,
|
||||
show_deleted=False)
|
||||
if a is None:
|
||||
continue
|
||||
amphorae.append(a)
|
||||
return amphorae
|
||||
|
||||
|
||||
class GetListenersFromLoadbalancer(BaseDatabaseTask):
|
||||
"""Task to pull the listeners from a loadbalancer."""
|
||||
|
||||
@ -1537,6 +1556,7 @@ class GetListenersFromLoadbalancer(BaseDatabaseTask):
|
||||
listeners = []
|
||||
for listener in loadbalancer.listeners:
|
||||
l = self.listener_repo.get(db_apis.get_session(), id=listener.id)
|
||||
l.load_balancer = loadbalancer
|
||||
listeners.append(l)
|
||||
return listeners
|
||||
|
||||
|
@ -215,6 +215,55 @@ class GetMemberPorts(BaseNetworkTask):
|
||||
return member_ports
|
||||
|
||||
|
||||
class HandleNetworkDelta(BaseNetworkTask):
|
||||
"""Task to plug and unplug networks
|
||||
|
||||
Plug or unplug networks based on delta
|
||||
"""
|
||||
|
||||
def execute(self, amphora, delta):
|
||||
"""Handle network plugging based off deltas."""
|
||||
added_ports = {}
|
||||
added_ports[amphora.id] = []
|
||||
for nic in delta.add_nics:
|
||||
interface = self.network_driver.plug_network(delta.compute_id,
|
||||
nic.network_id)
|
||||
port = self.network_driver.get_port(interface.port_id)
|
||||
port.network = self.network_driver.get_network(port.network_id)
|
||||
for fixed_ip in port.fixed_ips:
|
||||
fixed_ip.subnet = self.network_driver.get_subnet(
|
||||
fixed_ip.subnet_id)
|
||||
added_ports[amphora.id].append(port)
|
||||
for nic in delta.delete_nics:
|
||||
try:
|
||||
self.network_driver.unplug_network(delta.compute_id,
|
||||
nic.network_id)
|
||||
except base.NetworkNotFound:
|
||||
LOG.debug("Network %d not found ", nic.network_id)
|
||||
except Exception:
|
||||
LOG.exception("Unable to unplug network")
|
||||
return added_ports
|
||||
|
||||
def revert(self, result, amphora, delta, *args, **kwargs):
|
||||
"""Handle a network plug or unplug failures."""
|
||||
|
||||
if isinstance(result, failure.Failure):
|
||||
return
|
||||
|
||||
if not delta:
|
||||
return
|
||||
|
||||
LOG.warning("Unable to plug networks for amp id %s",
|
||||
delta.amphora_id)
|
||||
|
||||
for nic in delta.add_nics:
|
||||
try:
|
||||
self.network_driver.unplug_network(delta.compute_id,
|
||||
nic.network_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class HandleNetworkDeltas(BaseNetworkTask):
|
||||
"""Task to plug and unplug networks
|
||||
|
||||
|
@ -24,6 +24,7 @@ import six
|
||||
from octavia.amphorae.driver_exceptions import exceptions as driver_except
|
||||
from octavia.amphorae.drivers.haproxy import exceptions as exc
|
||||
from octavia.amphorae.drivers.haproxy import rest_api_driver as driver
|
||||
from octavia.common import constants
|
||||
from octavia.db import models
|
||||
from octavia.network import data_models as network_models
|
||||
from octavia.tests.unit import base
|
||||
@ -47,6 +48,9 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestHaproxyAmphoraLoadBalancerDriverTest, self).setUp()
|
||||
|
||||
conf = oslo_fixture.Config(cfg.CONF)
|
||||
conf.config(group="haproxy_amphora", user_group="everyone")
|
||||
|
||||
DEST1 = '198.51.100.0/24'
|
||||
DEST2 = '203.0.113.0/24'
|
||||
NEXTHOP = '192.0.2.1'
|
||||
@ -84,6 +88,50 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
|
||||
'mtu': FAKE_MTU,
|
||||
'host_routes': host_routes_data}
|
||||
|
||||
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
|
||||
constants.REQ_READ_TIMEOUT: 2,
|
||||
constants.CONN_MAX_RETRIES: 3,
|
||||
constants.CONN_RETRY_INTERVAL: 4}
|
||||
|
||||
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
||||
def test_update_amphora_listeners(self, mock_load_cert):
|
||||
mock_amphora = mock.MagicMock()
|
||||
mock_amphora.id = uuidutils.generate_uuid()
|
||||
mock_listener = mock.MagicMock()
|
||||
mock_listener.id = uuidutils.generate_uuid()
|
||||
mock_load_cert.return_value = {'tls_cert': None, 'sni_certs': []}
|
||||
self.driver.jinja.build_config.return_value = 'the_config'
|
||||
|
||||
self.driver.update_amphora_listeners(None, 1, [],
|
||||
self.timeout_dict)
|
||||
mock_load_cert.assert_not_called()
|
||||
self.driver.jinja.build_config.assert_not_called()
|
||||
self.driver.client.upload_config.assert_not_called()
|
||||
self.driver.client.reload_listener.assert_not_called()
|
||||
|
||||
self.driver.update_amphora_listeners([mock_listener], 0,
|
||||
[mock_amphora], self.timeout_dict)
|
||||
self.driver.jinja.build_config.assert_called_once_with(
|
||||
host_amphora=mock_amphora, listener=mock_listener,
|
||||
tls_cert=None, user_group="everyone")
|
||||
self.driver.client.upload_config.assert_called_once_with(
|
||||
mock_amphora, mock_listener.id, 'the_config',
|
||||
timeout_dict=self.timeout_dict)
|
||||
self.driver.client.reload_listener(mock_amphora, mock_listener.id,
|
||||
timeout_dict=self.timeout_dict)
|
||||
|
||||
mock_load_cert.reset_mock()
|
||||
self.driver.jinja.build_config.reset_mock()
|
||||
self.driver.client.upload_config.reset_mock()
|
||||
self.driver.client.reload_listener.reset_mock()
|
||||
mock_amphora.status = constants.DELETED
|
||||
self.driver.update_amphora_listeners([mock_listener], 0,
|
||||
[mock_amphora], self.timeout_dict)
|
||||
mock_load_cert.assert_not_called()
|
||||
self.driver.jinja.build_config.assert_not_called()
|
||||
self.driver.client.upload_config.assert_not_called()
|
||||
self.driver.client.reload_listener.assert_not_called()
|
||||
|
||||
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
||||
@mock.patch('octavia.common.tls_utils.cert_parser.get_host_names')
|
||||
def test_update(self, mock_cert, mock_load_crt):
|
||||
@ -157,11 +205,29 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
|
||||
self.amp, self.sl.id)
|
||||
|
||||
def test_start(self):
|
||||
amp1 = mock.MagicMock()
|
||||
amp2 = mock.MagicMock()
|
||||
amp2.status = constants.DELETED
|
||||
listener = mock.MagicMock()
|
||||
listener.id = uuidutils.generate_uuid()
|
||||
listener.load_balancer.amphorae = [amp1, amp2]
|
||||
# Execute driver method
|
||||
self.driver.start(self.sl, self.sv)
|
||||
self.driver.start(listener, self.sv)
|
||||
self.driver.client.start_listener.assert_called_once_with(
|
||||
amp1, listener.id)
|
||||
|
||||
def test_start_with_amphora(self):
|
||||
# Execute driver method
|
||||
amp = mock.MagicMock()
|
||||
self.driver.start(self.sl, self.sv, self.amp)
|
||||
self.driver.client.start_listener.assert_called_once_with(
|
||||
self.amp, self.sl.id)
|
||||
|
||||
self.driver.client.start_listener.reset_mock()
|
||||
amp.status = constants.DELETED
|
||||
self.driver.start(self.sl, self.sv, amp)
|
||||
self.driver.client.start_listener.assert_not_called()
|
||||
|
||||
def test_delete(self):
|
||||
# Execute driver method
|
||||
self.driver.delete(self.sl, self.sv)
|
||||
@ -169,13 +235,19 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
|
||||
self.amp, self.sl.id)
|
||||
|
||||
def test_get_info(self):
|
||||
pass
|
||||
self.driver.client.get_info.return_value = 'FAKE_INFO'
|
||||
result = self.driver.get_info(self.amp)
|
||||
self.assertEqual('FAKE_INFO', result)
|
||||
|
||||
def test_get_diagnostics(self):
|
||||
pass
|
||||
# TODO(johnsom) Implement once this exists on the amphora agent.
|
||||
result = self.driver.get_diagnostics(self.amp)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_finalize_amphora(self):
|
||||
pass
|
||||
# TODO(johnsom) Implement once this exists on the amphora agent.
|
||||
result = self.driver.finalize_amphora(self.amp)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_post_vip_plug(self):
|
||||
amphorae_network_config = mock.MagicMock()
|
||||
@ -249,7 +321,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
|
||||
def test_get_vrrp_interface(self):
|
||||
self.driver.get_vrrp_interface(self.amp)
|
||||
self.driver.client.get_interface.assert_called_once_with(
|
||||
self.amp, self.amp.vrrp_ip)
|
||||
self.amp, self.amp.vrrp_ip, timeout_dict=None)
|
||||
|
||||
|
||||
class TestAmphoraAPIClientTest(base.TestCase):
|
||||
@ -270,6 +342,10 @@ class TestAmphoraAPIClientTest(base.TestCase):
|
||||
'vrrp_ip': self.amp.vrrp_ip}
|
||||
patcher = mock.patch('time.sleep').start()
|
||||
self.addCleanup(patcher.stop)
|
||||
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
|
||||
constants.REQ_READ_TIMEOUT: 2,
|
||||
constants.CONN_MAX_RETRIES: 3,
|
||||
constants.CONN_RETRY_INTERVAL: 4}
|
||||
|
||||
def test_base_url(self):
|
||||
url = self.driver._base_url(FAKE_IP)
|
||||
@ -283,8 +359,8 @@ class TestAmphoraAPIClientTest(base.TestCase):
|
||||
@mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.time.sleep')
|
||||
def test_request(self, mock_sleep, mock_get):
|
||||
self.assertRaises(driver_except.TimeOutException,
|
||||
self.driver.request,
|
||||
'get', self.amp, 'unavailableURL')
|
||||
self.driver.request, 'get', self.amp,
|
||||
'unavailableURL', self.timeout_dict)
|
||||
|
||||
@requests_mock.mock()
|
||||
def test_get_info(self, m):
|
||||
|
@ -15,6 +15,7 @@
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.amphorae.drivers.noop_driver import driver
|
||||
from octavia.common import constants
|
||||
from octavia.common import data_models
|
||||
from octavia.network import data_models as network_models
|
||||
from octavia.tests.unit import base
|
||||
@ -44,6 +45,7 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
|
||||
super(TestNoopAmphoraLoadBalancerDriver, self).setUp()
|
||||
self.driver = driver.NoopAmphoraLoadBalancerDriver()
|
||||
self.listener = data_models.Listener()
|
||||
self.listener.id = uuidutils.generate_uuid()
|
||||
self.listener.protocol_port = 80
|
||||
self.vip = data_models.Vip()
|
||||
self.vip.ip_address = "10.0.0.1"
|
||||
@ -61,6 +63,19 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
|
||||
vip_subnet=network_models.Subnet(id=self.FAKE_UUID_1))
|
||||
}
|
||||
self.pem_file = 'test_pem_file'
|
||||
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
|
||||
constants.REQ_READ_TIMEOUT: 2,
|
||||
constants.CONN_MAX_RETRIES: 3,
|
||||
constants.CONN_RETRY_INTERVAL: 4}
|
||||
|
||||
def test_update_amphora_listeners(self):
|
||||
self.driver.update_amphora_listeners([self.listener], self.amphora.id,
|
||||
self.timeout_dict)
|
||||
self.assertEqual((self.listener, self.amphora.id, self.timeout_dict,
|
||||
'update_amp'),
|
||||
self.driver.driver.amphoraconfig[(
|
||||
self.listener.id,
|
||||
self.amphora.id)])
|
||||
|
||||
def test_update(self):
|
||||
self.driver.update(self.listener, self.vip)
|
||||
|
@ -19,6 +19,7 @@ from oslo_config import fixture as oslo_fixture
|
||||
from taskflow.patterns import linear_flow as flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.common import data_models
|
||||
from octavia.controller.worker.flows import amphora_flows
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
@ -38,6 +39,11 @@ class TestAmphoraFlows(base.TestCase):
|
||||
amphora_driver='amphora_haproxy_rest_driver')
|
||||
self.conf.config(group="nova", enable_anti_affinity=False)
|
||||
self.AmpFlow = amphora_flows.AmphoraFlows()
|
||||
self.amp1 = data_models.Amphora(id=1)
|
||||
self.amp2 = data_models.Amphora(id=2)
|
||||
self.amp3 = data_models.Amphora(id=3, status=constants.DELETED)
|
||||
self.lb = data_models.LoadBalancer(
|
||||
id=4, amphorae=[self.amp1, self.amp2, self.amp3])
|
||||
|
||||
def test_get_create_amphora_flow(self, mock_get_net_driver):
|
||||
|
||||
@ -237,7 +243,7 @@ class TestAmphoraFlows(base.TestCase):
|
||||
def test_get_failover_flow_allocated(self, mock_get_net_driver):
|
||||
|
||||
amp_flow = self.AmpFlow.get_failover_flow(
|
||||
load_balancer_id='mylb')
|
||||
load_balancer=self.lb)
|
||||
|
||||
self.assertIsInstance(amp_flow, flow.Flow)
|
||||
|
||||
@ -254,10 +260,10 @@ class TestAmphoraFlows(base.TestCase):
|
||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||
|
||||
self.assertEqual(3, len(amp_flow.requires))
|
||||
self.assertEqual(11, len(amp_flow.provides))
|
||||
self.assertEqual(12, len(amp_flow.provides))
|
||||
|
||||
amp_flow = self.AmpFlow.get_failover_flow(
|
||||
role=constants.ROLE_MASTER, load_balancer_id='mylb')
|
||||
role=constants.ROLE_MASTER, load_balancer=self.lb)
|
||||
|
||||
self.assertIsInstance(amp_flow, flow.Flow)
|
||||
|
||||
@ -274,10 +280,10 @@ class TestAmphoraFlows(base.TestCase):
|
||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||
|
||||
self.assertEqual(3, len(amp_flow.requires))
|
||||
self.assertEqual(11, len(amp_flow.provides))
|
||||
self.assertEqual(12, len(amp_flow.provides))
|
||||
|
||||
amp_flow = self.AmpFlow.get_failover_flow(
|
||||
role=constants.ROLE_BACKUP, load_balancer_id='mylb')
|
||||
role=constants.ROLE_BACKUP, load_balancer=self.lb)
|
||||
|
||||
self.assertIsInstance(amp_flow, flow.Flow)
|
||||
|
||||
@ -294,10 +300,10 @@ class TestAmphoraFlows(base.TestCase):
|
||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||
|
||||
self.assertEqual(3, len(amp_flow.requires))
|
||||
self.assertEqual(11, len(amp_flow.provides))
|
||||
self.assertEqual(12, len(amp_flow.provides))
|
||||
|
||||
amp_flow = self.AmpFlow.get_failover_flow(
|
||||
role='BOGUSROLE', load_balancer_id='mylb')
|
||||
role='BOGUSROLE', load_balancer=self.lb)
|
||||
|
||||
self.assertIsInstance(amp_flow, flow.Flow)
|
||||
|
||||
@ -314,12 +320,11 @@ class TestAmphoraFlows(base.TestCase):
|
||||
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
|
||||
|
||||
self.assertEqual(3, len(amp_flow.requires))
|
||||
self.assertEqual(11, len(amp_flow.provides))
|
||||
self.assertEqual(12, len(amp_flow.provides))
|
||||
|
||||
def test_get_failover_flow_spare(self, mock_get_net_driver):
|
||||
|
||||
amp_flow = self.AmpFlow.get_failover_flow(
|
||||
load_balancer_id=None)
|
||||
amp_flow = self.AmpFlow.get_failover_flow()
|
||||
|
||||
self.assertIsInstance(amp_flow, flow.Flow)
|
||||
|
||||
|
@ -14,9 +14,12 @@
|
||||
#
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_config import fixture as oslo_fixture
|
||||
from oslo_utils import uuidutils
|
||||
from taskflow.types import failure
|
||||
|
||||
from octavia.amphorae.driver_exceptions import exceptions as driver_except
|
||||
from octavia.common import constants
|
||||
from octavia.common import data_models
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
@ -28,6 +31,8 @@ AMP_ID = uuidutils.generate_uuid()
|
||||
COMPUTE_ID = uuidutils.generate_uuid()
|
||||
LISTENER_ID = uuidutils.generate_uuid()
|
||||
LB_ID = uuidutils.generate_uuid()
|
||||
CONN_MAX_RETRIES = 10
|
||||
CONN_RETRY_INTERVAL = 6
|
||||
|
||||
_amphora_mock = mock.MagicMock()
|
||||
_amphora_mock.id = AMP_ID
|
||||
@ -61,8 +66,42 @@ class TestAmphoraDriverTasks(base.TestCase):
|
||||
|
||||
_LB_mock.amphorae = [_amphora_mock]
|
||||
_LB_mock.id = LB_ID
|
||||
conf = oslo_fixture.Config(cfg.CONF)
|
||||
conf.config(group="haproxy_amphora",
|
||||
active_connection_max_retries=CONN_MAX_RETRIES)
|
||||
conf.config(group="haproxy_amphora",
|
||||
active_connection_rety_interval=CONN_RETRY_INTERVAL)
|
||||
super(TestAmphoraDriverTasks, self).setUp()
|
||||
|
||||
def test_amp_listener_update(self,
|
||||
mock_driver,
|
||||
mock_generate_uuid,
|
||||
mock_log,
|
||||
mock_get_session,
|
||||
mock_listener_repo_get,
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update):
|
||||
|
||||
timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
|
||||
constants.REQ_READ_TIMEOUT: 2,
|
||||
constants.CONN_MAX_RETRIES: 3,
|
||||
constants.CONN_RETRY_INTERVAL: 4}
|
||||
|
||||
amp_list_update_obj = amphora_driver_tasks.AmpListenersUpdate()
|
||||
amp_list_update_obj.execute([_listener_mock], 0,
|
||||
[_amphora_mock], timeout_dict)
|
||||
|
||||
mock_driver.update_amphora_listeners.assert_called_once_with(
|
||||
[_listener_mock], 0, [_amphora_mock], timeout_dict)
|
||||
|
||||
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
|
||||
|
||||
amp_list_update_obj.execute([_listener_mock], 0,
|
||||
[_amphora_mock], timeout_dict)
|
||||
|
||||
mock_amphora_repo_update.assert_called_once_with(
|
||||
_session_mock, AMP_ID, status=constants.ERROR)
|
||||
|
||||
def test_listener_update(self,
|
||||
mock_driver,
|
||||
mock_generate_uuid,
|
||||
@ -480,10 +519,15 @@ class TestAmphoraDriverTasks(base.TestCase):
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update):
|
||||
_LB_mock.amphorae = _amphorae_mock
|
||||
|
||||
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
|
||||
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
|
||||
|
||||
amphora_update_vrrp_interface_obj = (
|
||||
amphora_driver_tasks.AmphoraUpdateVRRPInterface())
|
||||
amphora_update_vrrp_interface_obj.execute(_LB_mock)
|
||||
mock_driver.get_vrrp_interface.assert_called_once_with(_amphora_mock)
|
||||
mock_driver.get_vrrp_interface.assert_called_once_with(
|
||||
_amphora_mock, timeout_dict=timeout_dict)
|
||||
|
||||
# Test revert
|
||||
mock_driver.reset_mock()
|
||||
@ -550,3 +594,22 @@ class TestAmphoraDriverTasks(base.TestCase):
|
||||
amphora_driver_tasks.AmphoraVRRPStart())
|
||||
amphora_vrrp_start_obj.execute(_LB_mock)
|
||||
mock_driver.start_vrrp_service.assert_called_once_with(_LB_mock)
|
||||
|
||||
def test_amphora_compute_connectivity_wait(self,
|
||||
mock_driver,
|
||||
mock_generate_uuid,
|
||||
mock_log,
|
||||
mock_get_session,
|
||||
mock_listener_repo_get,
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update):
|
||||
amp_compute_conn_wait_obj = (
|
||||
amphora_driver_tasks.AmphoraComputeConnectivityWait())
|
||||
amp_compute_conn_wait_obj.execute(_amphora_mock)
|
||||
mock_driver.get_info.assert_called_once_with(_amphora_mock)
|
||||
|
||||
mock_driver.get_info.side_effect = driver_except.TimeOutException()
|
||||
self.assertRaises(driver_except.TimeOutException,
|
||||
amp_compute_conn_wait_obj.execute, _amphora_mock)
|
||||
mock_amphora_repo_update.assert_called_once_with(
|
||||
_session_mock, AMP_ID, status=constants.ERROR)
|
||||
|
@ -339,7 +339,7 @@ class TestComputeTasks(base.TestCase):
|
||||
|
||||
mock_driver.get_amphora.return_value = _amphora_mock, None
|
||||
|
||||
computewait = compute_tasks.ComputeWait()
|
||||
computewait = compute_tasks.ComputeActiveWait()
|
||||
computewait.execute(COMPUTE_ID, AMPHORA_ID)
|
||||
|
||||
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
|
||||
@ -366,7 +366,7 @@ class TestComputeTasks(base.TestCase):
|
||||
|
||||
mock_driver.get_amphora.return_value = _amphora_mock, None
|
||||
|
||||
computewait = compute_tasks.ComputeWait()
|
||||
computewait = compute_tasks.ComputeActiveWait()
|
||||
computewait.execute(COMPUTE_ID, AMPHORA_ID)
|
||||
|
||||
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
|
||||
@ -391,7 +391,7 @@ class TestComputeTasks(base.TestCase):
|
||||
|
||||
mock_driver.get_amphora.return_value = _amphora_mock, None
|
||||
|
||||
computewait = compute_tasks.ComputeWait()
|
||||
computewait = compute_tasks.ComputeActiveWait()
|
||||
computewait.execute(COMPUTE_ID, AMPHORA_ID)
|
||||
|
||||
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
|
||||
|
@ -1766,6 +1766,59 @@ class TestDatabaseTasks(base.TestCase):
|
||||
repo.AmphoraRepository.update.assert_called_once_with(
|
||||
'TEST', AMP_ID, role=None, vrrp_priority=None)
|
||||
|
||||
@mock.patch('octavia.db.repositories.AmphoraRepository.get')
|
||||
def test_get_amphorae_from_loadbalancer(self,
|
||||
mock_amphora_get,
|
||||
mock_generate_uuid,
|
||||
mock_LOG,
|
||||
mock_get_session,
|
||||
mock_loadbalancer_repo_update,
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update,
|
||||
mock_amphora_repo_delete):
|
||||
amp1 = mock.MagicMock()
|
||||
amp1.id = uuidutils.generate_uuid()
|
||||
amp2 = mock.MagicMock()
|
||||
amp2.id = uuidutils.generate_uuid()
|
||||
lb = mock.MagicMock()
|
||||
lb.amphorae = [amp1, amp2]
|
||||
|
||||
mock_amphora_get.side_effect = [_amphora_mock, None]
|
||||
|
||||
get_amps_from_lb_obj = database_tasks.GetAmphoraeFromLoadbalancer()
|
||||
result = get_amps_from_lb_obj.execute(lb)
|
||||
self.assertEqual([_amphora_mock], result)
|
||||
|
||||
@mock.patch('octavia.db.repositories.ListenerRepository.get')
|
||||
def test_get_listeners_from_loadbalancer(self,
|
||||
mock_listener_get,
|
||||
mock_generate_uuid,
|
||||
mock_LOG,
|
||||
mock_get_session,
|
||||
mock_loadbalancer_repo_update,
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update,
|
||||
mock_amphora_repo_delete):
|
||||
mock_listener_get.return_value = _listener_mock
|
||||
_loadbalancer_mock.listeners = [_listener_mock]
|
||||
get_list_from_lb_obj = database_tasks.GetListenersFromLoadbalancer()
|
||||
result = get_list_from_lb_obj.execute(_loadbalancer_mock)
|
||||
mock_listener_get.assert_called_once_with('TEST', id=_listener_mock.id)
|
||||
self.assertEqual([_listener_mock], result)
|
||||
|
||||
def test_get_vip_from_loadbalancer(self,
|
||||
mock_generate_uuid,
|
||||
mock_LOG,
|
||||
mock_get_session,
|
||||
mock_loadbalancer_repo_update,
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update,
|
||||
mock_amphora_repo_delete):
|
||||
_loadbalancer_mock.vip = _vip_mock
|
||||
get_vip_from_lb_obj = database_tasks.GetVipFromLoadbalancer()
|
||||
result = get_vip_from_lb_obj.execute(_loadbalancer_mock)
|
||||
self.assertEqual(_vip_mock, result)
|
||||
|
||||
@mock.patch('octavia.db.repositories.VRRPGroupRepository.create')
|
||||
def test_create_vrrp_group_for_lb(self,
|
||||
mock_vrrp_group_create,
|
||||
|
@ -17,6 +17,7 @@ import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_config import fixture as oslo_fixture
|
||||
from oslo_utils import uuidutils
|
||||
from taskflow.types import failure
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.common import data_models as o_data_models
|
||||
@ -296,6 +297,69 @@ class TestNetworkTasks(base.TestCase):
|
||||
self.assertEqual([port_mock], ports)
|
||||
|
||||
def test_handle_network_delta(self, mock_get_net_driver):
|
||||
mock_net_driver = mock.MagicMock()
|
||||
mock_get_net_driver.return_value = mock_net_driver
|
||||
|
||||
nic1 = mock.MagicMock()
|
||||
nic1.network_id = uuidutils.generate_uuid()
|
||||
nic2 = mock.MagicMock()
|
||||
nic2.network_id = uuidutils.generate_uuid()
|
||||
interface1 = mock.MagicMock()
|
||||
interface1.port_id = uuidutils.generate_uuid()
|
||||
port1 = mock.MagicMock()
|
||||
port1.network_id = uuidutils.generate_uuid()
|
||||
fixed_ip = mock.MagicMock()
|
||||
fixed_ip.subnet_id = uuidutils.generate_uuid()
|
||||
port1.fixed_ips = [fixed_ip]
|
||||
subnet = mock.MagicMock()
|
||||
network = mock.MagicMock()
|
||||
|
||||
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
|
||||
compute_id=self.amphora_mock.compute_id,
|
||||
add_nics=[nic1],
|
||||
delete_nics=[nic2, nic2, nic2])
|
||||
|
||||
mock_net_driver.plug_network.return_value = interface1
|
||||
mock_net_driver.get_port.return_value = port1
|
||||
mock_net_driver.get_network.return_value = network
|
||||
mock_net_driver.get_subnet.return_value = subnet
|
||||
|
||||
mock_net_driver.unplug_network.side_effect = [
|
||||
None, net_base.NetworkNotFound, Exception]
|
||||
|
||||
handle_net_delta_obj = network_tasks.HandleNetworkDelta()
|
||||
result = handle_net_delta_obj.execute(self.amphora_mock, delta)
|
||||
|
||||
mock_net_driver.plug_network.assert_called_once_with(
|
||||
self.amphora_mock.compute_id, nic1.network_id)
|
||||
mock_net_driver.get_port.assert_called_once_with(interface1.port_id)
|
||||
mock_net_driver.get_network.assert_called_once_with(port1.network_id)
|
||||
mock_net_driver.get_subnet.assert_called_once_with(fixed_ip.subnet_id)
|
||||
|
||||
self.assertEqual({self.amphora_mock.id: [port1]}, result)
|
||||
|
||||
mock_net_driver.unplug_network.assert_called_with(
|
||||
self.amphora_mock.compute_id, nic2.network_id)
|
||||
|
||||
# Revert
|
||||
delta2 = data_models.Delta(amphora_id=self.amphora_mock.id,
|
||||
compute_id=self.amphora_mock.compute_id,
|
||||
add_nics=[nic1, nic1],
|
||||
delete_nics=[nic2, nic2, nic2])
|
||||
|
||||
mock_net_driver.unplug_network.reset_mock()
|
||||
handle_net_delta_obj.revert(
|
||||
failure.Failure.from_exception(Exception('boom')), None, None)
|
||||
mock_net_driver.unplug_network.assert_not_called()
|
||||
|
||||
mock_net_driver.unplug_network.reset_mock()
|
||||
handle_net_delta_obj.revert(None, None, None)
|
||||
mock_net_driver.unplug_network.assert_not_called()
|
||||
|
||||
mock_net_driver.unplug_network.reset_mock()
|
||||
handle_net_delta_obj.revert(None, None, delta2)
|
||||
|
||||
def test_handle_network_deltas(self, mock_get_net_driver):
|
||||
mock_driver = mock.MagicMock()
|
||||
mock_get_net_driver.return_value = mock_driver
|
||||
|
||||
|
@ -56,10 +56,20 @@ _l7rule_mock = mock.MagicMock()
|
||||
_create_map_flow_mock = mock.MagicMock()
|
||||
_amphora_mock.load_balancer_id = LB_ID
|
||||
_amphora_mock.id = AMP_ID
|
||||
_db_session = mock.MagicMock()
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TestException(Exception):
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
|
||||
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
|
||||
return_value=_amphora_mock)
|
||||
@mock.patch('octavia.db.repositories.HealthMonitorRepository.get',
|
||||
@ -79,7 +89,7 @@ CONF = cfg.CONF
|
||||
@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine._taskflow_load',
|
||||
return_value=_flow_mock)
|
||||
@mock.patch('taskflow.listeners.logging.DynamicLoggingListener')
|
||||
@mock.patch('octavia.db.api.get_session', return_value='TEST')
|
||||
@mock.patch('octavia.db.api.get_session', return_value=_db_session)
|
||||
class TestControllerWorker(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -162,7 +172,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw.delete_amphora(AMP_ID)
|
||||
|
||||
mock_amp_repo_get.assert_called_once_with(
|
||||
'TEST',
|
||||
_db_session,
|
||||
id=AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
@ -583,7 +593,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw.delete_load_balancer(LB_ID, cascade=False)
|
||||
|
||||
mock_lb_repo_get.assert_called_once_with(
|
||||
'TEST',
|
||||
_db_session,
|
||||
id=LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
@ -621,7 +631,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw.delete_load_balancer(LB_ID, cascade=True)
|
||||
|
||||
mock_lb_repo_get.assert_called_once_with(
|
||||
'TEST',
|
||||
_db_session,
|
||||
id=LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
@ -663,7 +673,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw.update_load_balancer(LB_ID, change)
|
||||
|
||||
mock_lb_repo_get.assert_called_once_with(
|
||||
'TEST',
|
||||
_db_session,
|
||||
id=LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
@ -1152,9 +1162,81 @@ class TestControllerWorker(base.TestCase):
|
||||
}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
mock_update.assert_called_with('TEST', LB_ID,
|
||||
mock_update.assert_called_with(_db_session, LB_ID,
|
||||
provisioning_status=constants.ACTIVE)
|
||||
|
||||
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
|
||||
'_perform_amphora_failover')
|
||||
def test_failover_amp_missing_amp(self,
|
||||
mock_perform_amp_failover,
|
||||
mock_api_get_session,
|
||||
mock_dyn_log_listener,
|
||||
mock_taskflow_load,
|
||||
mock_pool_repo_get,
|
||||
mock_member_repo_get,
|
||||
mock_l7rule_repo_get,
|
||||
mock_l7policy_repo_get,
|
||||
mock_listener_repo_get,
|
||||
mock_lb_repo_get,
|
||||
mock_health_mon_repo_get,
|
||||
mock_amp_repo_get):
|
||||
|
||||
mock_amp_repo_get.return_value = None
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
mock_perform_amp_failover.assert_not_called()
|
||||
|
||||
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
|
||||
'_perform_amphora_failover')
|
||||
def test_failover_amp_flow_exception(self,
|
||||
mock_perform_amp_failover,
|
||||
mock_api_get_session,
|
||||
mock_dyn_log_listener,
|
||||
mock_taskflow_load,
|
||||
mock_pool_repo_get,
|
||||
mock_member_repo_get,
|
||||
mock_l7rule_repo_get,
|
||||
mock_l7policy_repo_get,
|
||||
mock_listener_repo_get,
|
||||
mock_lb_repo_get,
|
||||
mock_health_mon_repo_get,
|
||||
mock_amp_repo_get):
|
||||
|
||||
mock_perform_amp_failover.side_effect = TestException('boom')
|
||||
cw = controller_worker.ControllerWorker()
|
||||
self.assertRaises(TestException, cw.failover_amphora, AMP_ID)
|
||||
|
||||
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
|
||||
'_perform_amphora_failover')
|
||||
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
|
||||
def test_failover_amp_no_lb(self,
|
||||
mock_lb_update,
|
||||
mock_perform_amp_failover,
|
||||
mock_api_get_session,
|
||||
mock_dyn_log_listener,
|
||||
mock_taskflow_load,
|
||||
mock_pool_repo_get,
|
||||
mock_member_repo_get,
|
||||
mock_l7rule_repo_get,
|
||||
mock_l7policy_repo_get,
|
||||
mock_listener_repo_get,
|
||||
mock_lb_repo_get,
|
||||
mock_health_mon_repo_get,
|
||||
mock_amp_repo_get):
|
||||
|
||||
amphora = mock.MagicMock()
|
||||
amphora.load_balancer_id = None
|
||||
mock_amp_repo_get.return_value = amphora
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
mock_lb_update.assert_not_called()
|
||||
mock_perform_amp_failover.assert_called_once_with(
|
||||
amphora, constants.LB_CREATE_FAILOVER_PRIORITY)
|
||||
|
||||
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete')
|
||||
def test_failover_deleted_amphora(self,
|
||||
mock_delete,
|
||||
@ -1178,7 +1260,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw._perform_amphora_failover(mock_amphora, 10)
|
||||
|
||||
mock_delete.assert_called_with('TEST', amphora_id=AMP_ID)
|
||||
mock_delete.assert_called_with(_db_session, amphora_id=AMP_ID)
|
||||
mock_taskflow_load.assert_not_called()
|
||||
|
||||
@mock.patch('octavia.controller.worker.'
|
||||
@ -1207,7 +1289,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw.failover_loadbalancer('123')
|
||||
mock_perform.assert_called_with(
|
||||
_amphora_mock2, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
|
||||
mock_update.assert_called_with('TEST', '123',
|
||||
mock_update.assert_called_with(_db_session, '123',
|
||||
provisioning_status=constants.ACTIVE)
|
||||
|
||||
mock_perform.reset
|
||||
@ -1219,13 +1301,13 @@ class TestControllerWorker(base.TestCase):
|
||||
# is the last one
|
||||
mock_perform.assert_called_with(
|
||||
_amphora_mock, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
|
||||
mock_update.assert_called_with('TEST', '123',
|
||||
mock_update.assert_called_with(_db_session, '123',
|
||||
provisioning_status=constants.ACTIVE)
|
||||
|
||||
mock_perform.reset
|
||||
mock_perform.side_effect = OverflowError()
|
||||
self.assertRaises(OverflowError, cw.failover_loadbalancer, 123)
|
||||
mock_update.assert_called_with('TEST', 123,
|
||||
mock_update.assert_called_with(_db_session, 123,
|
||||
provisioning_status=constants.ERROR)
|
||||
|
||||
@mock.patch('octavia.controller.worker.flows.'
|
||||
@ -1270,7 +1352,7 @@ class TestControllerWorker(base.TestCase):
|
||||
}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
mock_update.assert_called_with('TEST', LB_ID,
|
||||
mock_update.assert_called_with(_db_session, LB_ID,
|
||||
provisioning_status=constants.ACTIVE)
|
||||
|
||||
@mock.patch('octavia.controller.worker.flows.'
|
||||
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Fixes an issue where if more than one amphora fails at the same time,
|
||||
failover might not fully complete, leaving the load balancer in ERROR.
|
@ -53,9 +53,12 @@ def generate(flow_list, output_directory):
|
||||
get_flow_method = getattr(current_instance, current_tuple[2])
|
||||
if (current_tuple[1] == 'AmphoraFlows' and
|
||||
current_tuple[2] == 'get_failover_flow'):
|
||||
amp1 = dmh.generate_amphora()
|
||||
amp2 = dmh.generate_amphora()
|
||||
lb = dmh.generate_load_balancer(amphorae=[amp1, amp2])
|
||||
current_engine = engines.load(
|
||||
get_flow_method(role=constants.ROLE_STANDALONE,
|
||||
load_balancer_id=None))
|
||||
load_balancer=lb))
|
||||
elif (current_tuple[1] == 'LoadBalancerFlows' and
|
||||
current_tuple[2] == 'get_create_load_balancer_flow'):
|
||||
current_engine = engines.load(
|
||||
|
Loading…
x
Reference in New Issue
Block a user