Fix failover when multiple amphora have failed

If a load balancer loses more than one amphora at the same time
the failover process will fail and leave the load balancer in
provisioning status ERROR.

This patch resolves this by failing over one amphora at a time
marking any amphora that are also failed in status ERROR. The health
manager will then failover the other failed amphora in subsequent checks.

This patch will update multiple healthy amphora in parallel and will
timeout failed amphroa using the new "active_connection_max_retries"
configuration setting used for "fail-fast" connections.

The patch also updates the amphora failover flow documentation to
show the full flow and not just the spares failover flow.

It updates the amphora driver "get_diagnostics" method to pass instead
of error.

It also adds a AmphoraComputeConnectivityWait task to explicitly wait
for a compute instance to come up and be reachable. This allows a longer
timeout and clarifies this may fail due to compute (nova) failures.
Previously the first plug vip task would do this wait.

Change-Id: Ief97ddda8261b5bbc54c6824f90ae9c7a2d81701
Story: 2001481
Task: 6202
(cherry picked from commit 0139f12c2e)
This commit is contained in:
Michael Johnson 2018-06-21 21:00:49 -07:00
parent e62663ce79
commit 72715ba619
23 changed files with 748 additions and 85 deletions

View File

@ -186,6 +186,12 @@
#
# rest_request_conn_timeout = 10
# rest_request_read_timeout = 60
#
# These "active" timeouts are used once the amphora should already
# be fully up and active. These values are lower than the other values to
# facilitate "fail fast" scenarios like failovers
# active_connection_max_retries = 15
# active_connection_rety_interval = 2
[controller_worker]
# workers = 1

View File

@ -21,6 +21,25 @@ import six
@six.add_metaclass(abc.ABCMeta)
class AmphoraLoadBalancerDriver(object):
@abc.abstractmethod
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
"""Update the amphora with a new configuration.
:param listeners: List of listeners to update.
:type listener: list
:param amphora_id: The ID of the amphora to update
:type amphora_id: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:returns: None
Builds a new configuration, pushes it to the amphora, and reloads
the listener on one amphora.
"""
pass
@abc.abstractmethod
def update(self, listener, vip):
"""Update the amphora with a new configuration.
@ -30,7 +49,7 @@ class AmphoraLoadBalancerDriver(object):
:type listener: object
:param vip: vip object, need to use its ip_address property
:type vip: object
:returns: return a value list (listener, vip, status flag--update)
:returns: None
At this moment, we just build the basic structure for testing, will
add more function along with the development.

View File

@ -58,6 +58,45 @@ class HaproxyAmphoraLoadBalancerDriver(
base_crt_dir=CONF.haproxy_amphora.base_cert_dir,
haproxy_template=CONF.haproxy_amphora.haproxy_template)
def update_amphora_listeners(self, listeners, amphora_index,
amphorae, timeout_dict=None):
"""Update the amphora with a new configuration.
:param listeners: List of listeners to update.
:type listener: list
:param amphora_id: The ID of the amphora to update
:type amphora_id: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:returns: None
Updates the configuration of the listeners on a single amphora.
"""
# if the amphora does not yet have listeners, no need to update them.
if not listeners:
LOG.debug('No listeners found to update.')
return
amp = amphorae[amphora_index]
if amp is None or amp.status == consts.DELETED:
return
# TODO(johnsom) remove when we don't have a process per listener
for listener in listeners:
LOG.debug("%s updating listener %s on amphora %s",
self.__class__.__name__, listener.id, amp.id)
certs = self._process_tls_certificates(listener)
# Generate HaProxy configuration from listener object
config = self.jinja.build_config(
host_amphora=amp,
listener=listener,
tls_cert=certs['tls_cert'],
user_group=CONF.haproxy_amphora.user_group)
self.client.upload_config(amp, listener.id, config,
timeout_dict=timeout_dict)
self.client.reload_listener(amp, listener.id,
timeout_dict=timeout_dict)
def update(self, listener, vip):
LOG.debug("Amphora %s haproxy, updating listener %s, vip %s",
self.__class__.__name__, listener.protocol_port,
@ -83,25 +122,29 @@ class HaproxyAmphoraLoadBalancerDriver(
self.__class__.__name__, amp.id)
self.client.update_cert_for_rotation(amp, pem)
def _apply(self, func, listener=None, *args):
for amp in listener.load_balancer.amphorae:
if amp.status != consts.DELETED:
func(amp, listener.id, *args)
def _apply(self, func, listener=None, amphora=None, *args):
if amphora is None:
for amp in listener.load_balancer.amphorae:
if amp.status != consts.DELETED:
func(amp, listener.id, *args)
else:
if amphora.status != consts.DELETED:
func(amphora, listener.id, *args)
def stop(self, listener, vip):
self._apply(self.client.stop_listener, listener)
def start(self, listener, vip):
self._apply(self.client.start_listener, listener)
def start(self, listener, vip, amphora=None):
self._apply(self.client.start_listener, listener, amphora)
def delete(self, listener, vip):
self._apply(self.client.delete_listener, listener)
def get_info(self, amphora):
self.driver.get_info(amphora.lb_network_ip)
return self.client.get_info(amphora)
def get_diagnostics(self, amphora):
self.driver.get_diagnostics(amphora.lb_network_ip)
pass
def finalize_amphora(self, amphora):
pass
@ -184,7 +227,7 @@ class HaproxyAmphoraLoadBalancerDriver(
pem = cert_parser.build_pem(cert)
md5 = hashlib.md5(pem).hexdigest() # nosec
name = '{id}.pem'.format(id=cert.id)
self._apply(self._upload_cert, listener, pem, md5, name)
self._apply(self._upload_cert, listener, None, pem, md5, name)
return {'tls_cert': tls_cert, 'sni_certs': sni_certs}
@ -249,17 +292,27 @@ class AmphoraAPIClient(object):
port=CONF.haproxy_amphora.bind_port,
version=API_VERSION)
def request(self, method, amp, path='/', **kwargs):
def request(self, method, amp, path='/', timeout_dict=None, **kwargs):
cfg_ha_amp = CONF.haproxy_amphora
if timeout_dict is None:
timeout_dict = {}
req_conn_timeout = timeout_dict.get(
consts.REQ_CONN_TIMEOUT, cfg_ha_amp.rest_request_conn_timeout)
req_read_timeout = timeout_dict.get(
consts.REQ_READ_TIMEOUT, cfg_ha_amp.rest_request_read_timeout)
conn_max_retries = timeout_dict.get(
consts.CONN_MAX_RETRIES, cfg_ha_amp.connection_max_retries)
conn_retry_interval = timeout_dict.get(
consts.CONN_RETRY_INTERVAL, cfg_ha_amp.connection_retry_interval)
LOG.debug("request url %s", path)
_request = getattr(self.session, method.lower())
_url = self._base_url(amp.lb_network_ip) + path
LOG.debug("request url %s", _url)
timeout_tuple = (CONF.haproxy_amphora.rest_request_conn_timeout,
CONF.haproxy_amphora.rest_request_read_timeout)
reqargs = {
'verify': CONF.haproxy_amphora.server_ca,
'url': _url,
'timeout': timeout_tuple, }
'timeout': (req_conn_timeout, req_read_timeout), }
reqargs.update(kwargs)
headers = reqargs.setdefault('headers', {})
@ -267,7 +320,7 @@ class AmphoraAPIClient(object):
self.ssl_adapter.uuid = amp.id
exception = None
# Keep retrying
for a in six.moves.xrange(CONF.haproxy_amphora.connection_max_retries):
for a in six.moves.xrange(conn_max_retries):
try:
with warnings.catch_warnings():
warnings.filterwarnings(
@ -301,20 +354,20 @@ class AmphoraAPIClient(object):
except (requests.ConnectionError, requests.Timeout) as e:
exception = e
LOG.warning("Could not connect to instance. Retrying.")
time.sleep(CONF.haproxy_amphora.connection_retry_interval)
time.sleep(conn_retry_interval)
LOG.error("Connection retries (currently set to %(max_retries)s) "
"exhausted. The amphora is unavailable. Reason: "
"%(exception)s",
{'max_retries': CONF.haproxy_amphora.connection_max_retries,
{'max_retries': conn_max_retries,
'exception': exception})
raise driver_except.TimeOutException()
def upload_config(self, amp, listener_id, config):
def upload_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(
amp,
'listeners/{amphora_id}/{listener_id}/haproxy'.format(
amphora_id=amp.id, listener_id=listener_id),
amphora_id=amp.id, listener_id=listener_id), timeout_dict,
data=config)
return exc.check_exception(r)
@ -325,9 +378,9 @@ class AmphoraAPIClient(object):
if exc.check_exception(r):
return r.json()
def _action(self, action, amp, listener_id):
def _action(self, action, amp, listener_id, timeout_dict=None):
r = self.put(amp, 'listeners/{listener_id}/{action}'.format(
listener_id=listener_id, action=action))
listener_id=listener_id, action=action), timeout_dict=timeout_dict)
return exc.check_exception(r)
def upload_cert_pem(self, amp, listener_id, pem_filename, pem_file):
@ -396,7 +449,8 @@ class AmphoraAPIClient(object):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
return exc.check_exception(r)
def get_interface(self, amp, ip_addr):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr))
def get_interface(self, amp, ip_addr, timeout_dict=None):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()

View File

@ -90,5 +90,6 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
self.client.reload_vrrp(amp)
def get_vrrp_interface(self, amphora):
return self.client.get_interface(amphora, amphora.vrrp_ip)['interface']
def get_vrrp_interface(self, amphora, timeout_dict=None):
return self.client.get_interface(
amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface']

View File

@ -37,6 +37,14 @@ class NoopManager(object):
super(NoopManager, self).__init__()
self.amphoraconfig = {}
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
for listener in listeners:
LOG.debug("Amphora noop driver update_amphora_listeners, "
"listener %s, amphora %s, timeouts %s", listener.id,
amphora_id, timeout_dict)
self.amphoraconfig[(listener.id, amphora_id)] = (
listener, amphora_id, timeout_dict, "update_amp")
def update(self, listener, vip):
LOG.debug("Amphora %s no-op, update listener %s, vip %s",
self.__class__.__name__, listener.protocol_port,
@ -106,6 +114,11 @@ class NoopAmphoraLoadBalancerDriver(
super(NoopAmphoraLoadBalancerDriver, self).__init__()
self.driver = NoopManager()
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
self.driver.update_amphora_listeners(listeners, amphora_id,
timeout_dict)
def update(self, listener, vip):
self.driver.update(listener, vip)

View File

@ -224,6 +224,13 @@ haproxy_amphora_opts = [
default=5,
help=_('Retry timeout between connection attempts in '
'seconds.')),
cfg.IntOpt('active_connection_max_retries',
default=15,
help=_('Retry threshold for connecting to active amphorae.')),
cfg.IntOpt('active_connection_rety_interval',
default=2,
help=_('Retry timeout between connection attempts in '
'seconds for active amphora.')),
cfg.IntOpt('build_rate_limit',
default=-1,
help=_('Number of amphorae that could be built per controller'

View File

@ -178,6 +178,7 @@ FAILED_AMPHORA = 'failed_amphora'
FAILOVER_AMPHORA = 'failover_amphora'
AMPHORAE = 'amphorae'
AMPHORA_ID = 'amphora_id'
AMPHORA_INDEX = 'amphora_index'
FAILOVER_AMPHORA_ID = 'failover_amphora_id'
DELTA = 'delta'
DELTAS = 'deltas'
@ -211,6 +212,21 @@ ADDED_PORTS = 'added_ports'
PORTS = 'ports'
MEMBER_PORTS = 'member_ports'
LOADBALANCER_TOPOLOGY = 'topology'
HEALTH_MONITOR_ID = 'health_monitor_id'
L7POLICY_ID = 'l7policy_id'
L7RULE_ID = 'l7rule_id'
LOAD_BALANCER_UPDATES = 'load_balancer_updates'
LISTENER_UPDATES = 'listener_updates'
POOL_UPDATES = 'pool_updates'
MEMBER_UPDATES = 'member_updates'
HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
L7POLICY_UPDATES = 'l7policy_updates'
L7RULE_UPDATES = 'l7rule_updates'
TIMEOUT_DICT = 'timeout_dict'
REQ_CONN_TIMEOUT = 'req_conn_timeout'
REQ_READ_TIMEOUT = 'req_read_timeout'
CONN_MAX_RETRIES = 'conn_max_retries'
CONN_RETRY_INTERVAL = 'conn_retry_interval'
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
@ -244,6 +260,7 @@ UPDATE_MEMBER_FLOW = 'octavia-update-member-flow'
UPDATE_POOL_FLOW = 'octavia-update-pool-flow'
UPDATE_L7POLICY_FLOW = 'octavia-update-l7policy-flow'
UPDATE_L7RULE_FLOW = 'octavia-update-l7rule-flow'
UPDATE_AMPS_SUBFLOW = 'octavia-update-amps-subflow'
POST_MAP_AMP_TO_LB_SUBFLOW = 'octavia-post-map-amp-to-lb-subflow'
CREATE_AMP_FOR_LB_SUBFLOW = 'octavia-create-amp-for-lb-subflow'
@ -277,6 +294,8 @@ AMP_VRRP_STOP = 'octavia-amphora-vrrp-stop'
AMP_UPDATE_VRRP_INTF = 'octavia-amphora-update-vrrp-intf'
CREATE_VRRP_GROUP_FOR_LB = 'octavia-create-vrrp-group-for-lb'
CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules'
AMP_COMPUTE_CONNECTIVITY_WAIT = 'octavia-amp-compute-connectivity-wait'
AMP_LISTENER_UPDATE = 'octavia-amp-listeners-update'
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'

View File

@ -674,15 +674,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# if we run with anti-affinity we need to set the server group
# as well
if CONF.nova.enable_anti_affinity:
lb = self._amphora_repo.get_lb_for_amphora(
db_apis.get_session(), amp.id)
if lb:
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
lb = self._amphora_repo.get_lb_for_amphora(
db_apis.get_session(), amp.id)
if CONF.nova.enable_anti_affinity and lb:
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(role=amp.role,
status=amp.status),
self._amphora_flows.get_failover_flow(
role=amp.role, load_balancer=lb),
store=stored_params)
with tf_logging.DynamicLoggingListener(
@ -701,6 +700,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
try:
amp = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
if not amp:
LOG.warning("Could not fetch Amphora %s from DB, ignoring "
"failover request.", amphora_id)
return
self._perform_amphora_failover(
amp, constants.LB_CREATE_FAILOVER_PRIORITY)
if amp.load_balancer_id:

View File

@ -16,6 +16,7 @@
from oslo_config import cfg
from taskflow.patterns import graph_flow
from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
@ -62,12 +63,15 @@ class AmphoraFlows(object):
provides=constants.COMPUTE_ID))
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amphora_flow.add(compute_tasks.ComputeWait(
create_amphora_flow.add(compute_tasks.ComputeActiveWait(
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amphora_flow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
requires=constants.AMPHORA))
create_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
@ -172,7 +176,7 @@ class AmphoraFlows(object):
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBootingInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_BOOTING_INDB,
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amp_for_lb_subflow.add(compute_tasks.ComputeWait(
create_amp_for_lb_subflow.add(compute_tasks.ComputeActiveWait(
name=sf_name + '-' + constants.COMPUTE_WAIT,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
@ -180,6 +184,10 @@ class AmphoraFlows(object):
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amp_for_lb_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT,
requires=constants.AMPHORA))
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA))
@ -290,7 +298,7 @@ class AmphoraFlows(object):
return delete_amphora_flow
def get_failover_flow(self, role=constants.ROLE_STANDALONE,
status=constants.AMPHORA_READY):
load_balancer=None):
"""Creates a flow to failover a stale amphora
:returns: The flow for amphora failover
@ -329,16 +337,16 @@ class AmphoraFlows(object):
failover_amphora_flow.add(network_tasks.WaitForPortDetach(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
# If this is an unallocated amp (spares pool), we're done
if status != constants.AMPHORA_ALLOCATED:
if not load_balancer:
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
return failover_amphora_flow
# Save failed amphora details for later
@ -373,9 +381,38 @@ class AmphoraFlows(object):
provides=constants.AMPHORAE_NETWORK_CONFIG))
failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.LISTENERS))
failover_amphora_flow.add(database_tasks.GetAmphoraeFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.AMPHORAE))
failover_amphora_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=(constants.LOADBALANCER, constants.LISTENERS)))
# Listeners update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an
# unordered subflow.
update_amps_subflow = unordered_flow.Flow(
constants.UPDATE_AMPS_SUBFLOW)
timeout_dict = {
constants.CONN_MAX_RETRIES:
CONF.haproxy_amphora.active_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_rety_interval}
# Setup parallel flows for each amp. We don't know the new amp
# details at flow creation time, so setup a subflow for each
# amp on the LB, they let the task index into a list of amps
# to find the amphora it should work on.
amp_index = 0
for amp in load_balancer.amphorae:
if amp.status == constants.DELETED:
continue
update_amps_subflow.add(
amphora_driver_tasks.AmpListenersUpdate(
name=constants.AMP_LISTENER_UPDATE + '-' + str(amp_index),
requires=(constants.LISTENERS, constants.AMPHORAE),
inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict}))
amp_index += 1
failover_amphora_flow.add(update_amps_subflow)
# Plug the VIP ports into the new amphora
failover_amphora_flow.add(network_tasks.PlugVIPPort(
@ -385,13 +422,22 @@ class AmphoraFlows(object):
constants.AMPHORAE_NETWORK_CONFIG)))
# Plug the member networks into the new amphora
failover_amphora_flow.add(network_tasks.CalculateDelta(
requires=constants.LOADBALANCER, provides=constants.DELTAS))
failover_amphora_flow.add(network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
requires=(constants.LOADBALANCER, constants.AMPHORA),
provides=constants.DELTA))
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
requires=(constants.AMPHORA, constants.DELTA),
provides=constants.ADDED_PORTS))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraePostNetworkPlug(
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
failover_amphora_flow.add(database_tasks.ReloadLoadBalancer(
name='octavia-failover-LB-reload-2',
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
# Handle the amphora role and VRRP if necessary
if role == constants.ROLE_MASTER:
failover_amphora_flow.add(database_tasks.MarkAmphoraMasterInDB(
@ -412,7 +458,12 @@ class AmphoraFlows(object):
requires=constants.AMPHORA))
failover_amphora_flow.add(amphora_driver_tasks.ListenersStart(
requires=(constants.LOADBALANCER, constants.LISTENERS)))
requires=(constants.LOADBALANCER, constants.LISTENERS,
constants.AMPHORA)))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
return failover_amphora_flow

View File

@ -21,6 +21,7 @@ from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.common import constants
from octavia.controller.worker import task_utils as task_utilities
from octavia.db import api as db_apis
@ -46,6 +47,25 @@ class BaseAmphoraTask(task.Task):
self.task_utils = task_utilities.TaskUtils()
class AmpListenersUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora."""
def execute(self, listeners, amphora_index, amphorae, timeout_dict=()):
# Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let
# health manager fix it.
try:
self.amphora_driver.update_amphora_listeners(
listeners, amphora_index, amphorae, timeout_dict)
except Exception as e:
amphora_id = amphorae[amphora_index].id
LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s',
amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
class ListenersUpdate(BaseAmphoraTask):
"""Task to update amphora with all specified listeners' configurations."""
@ -105,10 +125,10 @@ class ListenerStart(BaseAmphoraTask):
class ListenersStart(BaseAmphoraTask):
"""Task to start all listeners on the vip."""
def execute(self, loadbalancer, listeners):
def execute(self, loadbalancer, listeners, amphora=None):
"""Execute listener start routines for listeners on an amphora."""
for listener in listeners:
self.amphora_driver.start(listener, loadbalancer.vip)
self.amphora_driver.start(listener, loadbalancer.vip, amphora)
LOG.debug("Started the listeners on the vip")
def revert(self, listeners, *args, **kwargs):
@ -262,11 +282,27 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
def execute(self, loadbalancer):
"""Execute post_vip_routine."""
amps = []
timeout_dict = {
constants.CONN_MAX_RETRIES:
CONF.haproxy_amphora.active_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_rety_interval}
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
# Currently this is supported only with REST Driver
interface = self.amphora_driver.get_vrrp_interface(amp)
try:
interface = self.amphora_driver.get_vrrp_interface(
amp, timeout_dict=timeout_dict)
except Exception as e:
# This can occur when an active/standby LB has no listener
LOG.error('Failed to get amphora VRRP interface on amphora '
'%s. Skipping this amphora as it is failing due to: '
'%s', amp.id, str(e))
self.amphora_repo.update(db_apis.get_session(), amp.id,
status=constants.ERROR)
continue
self.amphora_repo.update(db_apis.get_session(), amp.id,
vrrp_interface=interface)
amps.append(self.amphora_repo.get(db_apis.get_session(),
@ -318,3 +354,22 @@ class AmphoraVRRPStart(BaseAmphoraTask):
self.amphora_driver.start_vrrp_service(loadbalancer)
LOG.debug("Started VRRP of loadbalancer %s amphorae",
loadbalancer.id)
class AmphoraComputeConnectivityWait(BaseAmphoraTask):
""""Task to wait for the compute instance to be up."""
def execute(self, amphora):
"""Execute get_info routine for an amphora until it responds."""
try:
amp_info = self.amphora_driver.get_info(amphora)
LOG.debug('Successfuly connected to amphora %s: %s',
amphora.id, amp_info)
except driver_except.TimeOutException:
LOG.error("Amphora compute instance failed to become reachable. "
"This either means the compute driver failed to fully "
"boot the instance inside the timeout interval or the "
"instance is not reachable via the lb-mgmt-net.")
self.amphora_repo.update(db_apis.get_session(), amphora.id,
status=constants.ERROR)
raise

View File

@ -176,7 +176,7 @@ class ComputeDelete(BaseComputeTask):
raise
class ComputeWait(BaseComputeTask):
class ComputeActiveWait(BaseComputeTask):
"""Wait for the compute driver to mark the amphora active."""
def execute(self, compute_id, amphora_id):

View File

@ -1525,6 +1525,25 @@ class GetAmphoraDetails(BaseDatabaseTask):
vrrp_priority=amphora.vrrp_priority)
class GetAmphoraeFromLoadbalancer(BaseDatabaseTask):
"""Task to pull the listeners from a loadbalancer."""
def execute(self, loadbalancer):
"""Pull the amphorae from a loadbalancer.
:param loadbalancer: Load balancer which listeners are required
:returns: A list of Listener objects
"""
amphorae = []
for amp in loadbalancer.amphorae:
a = self.amphora_repo.get(db_apis.get_session(), id=amp.id,
show_deleted=False)
if a is None:
continue
amphorae.append(a)
return amphorae
class GetListenersFromLoadbalancer(BaseDatabaseTask):
"""Task to pull the listeners from a loadbalancer."""
@ -1537,6 +1556,7 @@ class GetListenersFromLoadbalancer(BaseDatabaseTask):
listeners = []
for listener in loadbalancer.listeners:
l = self.listener_repo.get(db_apis.get_session(), id=listener.id)
l.load_balancer = loadbalancer
listeners.append(l)
return listeners

View File

@ -216,6 +216,55 @@ class GetMemberPorts(BaseNetworkTask):
return member_ports
class HandleNetworkDelta(BaseNetworkTask):
"""Task to plug and unplug networks
Plug or unplug networks based on delta
"""
def execute(self, amphora, delta):
"""Handle network plugging based off deltas."""
added_ports = {}
added_ports[amphora.id] = []
for nic in delta.add_nics:
interface = self.network_driver.plug_network(delta.compute_id,
nic.network_id)
port = self.network_driver.get_port(interface.port_id)
port.network = self.network_driver.get_network(port.network_id)
for fixed_ip in port.fixed_ips:
fixed_ip.subnet = self.network_driver.get_subnet(
fixed_ip.subnet_id)
added_ports[amphora.id].append(port)
for nic in delta.delete_nics:
try:
self.network_driver.unplug_network(delta.compute_id,
nic.network_id)
except base.NetworkNotFound:
LOG.debug("Network %d not found ", nic.network_id)
except Exception:
LOG.exception("Unable to unplug network")
return added_ports
def revert(self, result, amphora, delta, *args, **kwargs):
"""Handle a network plug or unplug failures."""
if isinstance(result, failure.Failure):
return
if not delta:
return
LOG.warning("Unable to plug networks for amp id %s",
delta.amphora_id)
for nic in delta.add_nics:
try:
self.network_driver.unplug_network(delta.compute_id,
nic.network_id)
except Exception:
pass
class HandleNetworkDeltas(BaseNetworkTask):
"""Task to plug and unplug networks

View File

@ -24,6 +24,7 @@ import six
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.amphorae.drivers.haproxy import exceptions as exc
from octavia.amphorae.drivers.haproxy import rest_api_driver as driver
from octavia.common import constants
from octavia.db import models
from octavia.network import data_models as network_models
from octavia.tests.unit import base
@ -47,6 +48,9 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
def setUp(self):
super(TestHaproxyAmphoraLoadBalancerDriverTest, self).setUp()
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="haproxy_amphora", user_group="everyone")
DEST1 = '198.51.100.0/24'
DEST2 = '203.0.113.0/24'
NEXTHOP = '192.0.2.1'
@ -84,6 +88,50 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
'mtu': FAKE_MTU,
'host_routes': host_routes_data}
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test_update_amphora_listeners(self, mock_load_cert):
mock_amphora = mock.MagicMock()
mock_amphora.id = uuidutils.generate_uuid()
mock_listener = mock.MagicMock()
mock_listener.id = uuidutils.generate_uuid()
mock_load_cert.return_value = {'tls_cert': None, 'sni_certs': []}
self.driver.jinja.build_config.return_value = 'the_config'
self.driver.update_amphora_listeners(None, 1, [],
self.timeout_dict)
mock_load_cert.assert_not_called()
self.driver.jinja.build_config.assert_not_called()
self.driver.client.upload_config.assert_not_called()
self.driver.client.reload_listener.assert_not_called()
self.driver.update_amphora_listeners([mock_listener], 0,
[mock_amphora], self.timeout_dict)
self.driver.jinja.build_config.assert_called_once_with(
host_amphora=mock_amphora, listener=mock_listener,
tls_cert=None, user_group="everyone")
self.driver.client.upload_config.assert_called_once_with(
mock_amphora, mock_listener.id, 'the_config',
timeout_dict=self.timeout_dict)
self.driver.client.reload_listener(mock_amphora, mock_listener.id,
timeout_dict=self.timeout_dict)
mock_load_cert.reset_mock()
self.driver.jinja.build_config.reset_mock()
self.driver.client.upload_config.reset_mock()
self.driver.client.reload_listener.reset_mock()
mock_amphora.status = constants.DELETED
self.driver.update_amphora_listeners([mock_listener], 0,
[mock_amphora], self.timeout_dict)
mock_load_cert.assert_not_called()
self.driver.jinja.build_config.assert_not_called()
self.driver.client.upload_config.assert_not_called()
self.driver.client.reload_listener.assert_not_called()
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
@mock.patch('octavia.common.tls_utils.cert_parser.get_host_names')
def test_update(self, mock_cert, mock_load_crt):
@ -157,11 +205,29 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.amp, self.sl.id)
def test_start(self):
amp1 = mock.MagicMock()
amp2 = mock.MagicMock()
amp2.status = constants.DELETED
listener = mock.MagicMock()
listener.id = uuidutils.generate_uuid()
listener.load_balancer.amphorae = [amp1, amp2]
# Execute driver method
self.driver.start(self.sl, self.sv)
self.driver.start(listener, self.sv)
self.driver.client.start_listener.assert_called_once_with(
amp1, listener.id)
def test_start_with_amphora(self):
# Execute driver method
amp = mock.MagicMock()
self.driver.start(self.sl, self.sv, self.amp)
self.driver.client.start_listener.assert_called_once_with(
self.amp, self.sl.id)
self.driver.client.start_listener.reset_mock()
amp.status = constants.DELETED
self.driver.start(self.sl, self.sv, amp)
self.driver.client.start_listener.assert_not_called()
def test_delete(self):
# Execute driver method
self.driver.delete(self.sl, self.sv)
@ -169,13 +235,19 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.amp, self.sl.id)
def test_get_info(self):
pass
self.driver.client.get_info.return_value = 'FAKE_INFO'
result = self.driver.get_info(self.amp)
self.assertEqual('FAKE_INFO', result)
def test_get_diagnostics(self):
pass
# TODO(johnsom) Implement once this exists on the amphora agent.
result = self.driver.get_diagnostics(self.amp)
self.assertIsNone(result)
def test_finalize_amphora(self):
pass
# TODO(johnsom) Implement once this exists on the amphora agent.
result = self.driver.finalize_amphora(self.amp)
self.assertIsNone(result)
def test_post_vip_plug(self):
amphorae_network_config = mock.MagicMock()
@ -249,7 +321,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
def test_get_vrrp_interface(self):
self.driver.get_vrrp_interface(self.amp)
self.driver.client.get_interface.assert_called_once_with(
self.amp, self.amp.vrrp_ip)
self.amp, self.amp.vrrp_ip, timeout_dict=None)
class TestAmphoraAPIClientTest(base.TestCase):
@ -270,6 +342,10 @@ class TestAmphoraAPIClientTest(base.TestCase):
'vrrp_ip': self.amp.vrrp_ip}
patcher = mock.patch('time.sleep').start()
self.addCleanup(patcher.stop)
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
def test_base_url(self):
url = self.driver._base_url(FAKE_IP)
@ -283,8 +359,8 @@ class TestAmphoraAPIClientTest(base.TestCase):
@mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.time.sleep')
def test_request(self, mock_sleep, mock_get):
self.assertRaises(driver_except.TimeOutException,
self.driver.request,
'get', self.amp, 'unavailableURL')
self.driver.request, 'get', self.amp,
'unavailableURL', self.timeout_dict)
@requests_mock.mock()
def test_get_info(self, m):

View File

@ -15,6 +15,7 @@
from oslo_utils import uuidutils
from octavia.amphorae.drivers.noop_driver import driver
from octavia.common import constants
from octavia.common import data_models
from octavia.network import data_models as network_models
from octavia.tests.unit import base
@ -44,6 +45,7 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
super(TestNoopAmphoraLoadBalancerDriver, self).setUp()
self.driver = driver.NoopAmphoraLoadBalancerDriver()
self.listener = data_models.Listener()
self.listener.id = uuidutils.generate_uuid()
self.listener.protocol_port = 80
self.vip = data_models.Vip()
self.vip.ip_address = "10.0.0.1"
@ -61,6 +63,19 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
vip_subnet=network_models.Subnet(id=self.FAKE_UUID_1))
}
self.pem_file = 'test_pem_file'
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
def test_update_amphora_listeners(self):
self.driver.update_amphora_listeners([self.listener], self.amphora.id,
self.timeout_dict)
self.assertEqual((self.listener, self.amphora.id, self.timeout_dict,
'update_amp'),
self.driver.driver.amphoraconfig[(
self.listener.id,
self.amphora.id)])
def test_update(self):
self.driver.update(self.listener, self.vip)

View File

@ -19,6 +19,7 @@ from oslo_config import fixture as oslo_fixture
from taskflow.patterns import linear_flow as flow
from octavia.common import constants
from octavia.common import data_models
from octavia.controller.worker.flows import amphora_flows
import octavia.tests.unit.base as base
@ -38,6 +39,11 @@ class TestAmphoraFlows(base.TestCase):
amphora_driver='amphora_haproxy_rest_driver')
self.conf.config(group="nova", enable_anti_affinity=False)
self.AmpFlow = amphora_flows.AmphoraFlows()
self.amp1 = data_models.Amphora(id=1)
self.amp2 = data_models.Amphora(id=2)
self.amp3 = data_models.Amphora(id=3, status=constants.DELETED)
self.lb = data_models.LoadBalancer(
id=4, amphorae=[self.amp1, self.amp2, self.amp3])
def test_get_create_amphora_flow(self, mock_get_net_driver):
@ -237,7 +243,7 @@ class TestAmphoraFlows(base.TestCase):
def test_get_failover_flow_allocated(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_flow(
status=constants.AMPHORA_ALLOCATED)
load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -254,10 +260,10 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role=constants.ROLE_MASTER, status=constants.AMPHORA_ALLOCATED)
role=constants.ROLE_MASTER, load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -274,10 +280,10 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role=constants.ROLE_BACKUP, status=constants.AMPHORA_ALLOCATED)
role=constants.ROLE_BACKUP, load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -294,10 +300,10 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role='BOGUSROLE', status=constants.AMPHORA_ALLOCATED)
role='BOGUSROLE', load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -314,12 +320,11 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
def test_get_failover_flow_spare(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_flow(
status=constants.AMPHORA_READY)
amp_flow = self.AmpFlow.get_failover_flow()
self.assertIsInstance(amp_flow, flow.Flow)

View File

@ -14,9 +14,12 @@
#
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from taskflow.types import failure
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.common import constants
from octavia.common import data_models
from octavia.controller.worker.tasks import amphora_driver_tasks
@ -28,6 +31,8 @@ AMP_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
LISTENER_ID = uuidutils.generate_uuid()
LB_ID = uuidutils.generate_uuid()
CONN_MAX_RETRIES = 10
CONN_RETRY_INTERVAL = 6
_amphora_mock = mock.MagicMock()
_amphora_mock.id = AMP_ID
@ -61,8 +66,42 @@ class TestAmphoraDriverTasks(base.TestCase):
_LB_mock.amphorae = [_amphora_mock]
_LB_mock.id = LB_ID
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="haproxy_amphora",
active_connection_max_retries=CONN_MAX_RETRIES)
conf.config(group="haproxy_amphora",
active_connection_rety_interval=CONN_RETRY_INTERVAL)
super(TestAmphoraDriverTasks, self).setUp()
def test_amp_listener_update(self,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
amp_list_update_obj = amphora_driver_tasks.AmpListenersUpdate()
amp_list_update_obj.execute([_listener_mock], 0,
[_amphora_mock], timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with(
[_listener_mock], 0, [_amphora_mock], timeout_dict)
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute([_listener_mock], 0,
[_amphora_mock], timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)
def test_listener_update(self,
mock_driver,
mock_generate_uuid,
@ -480,10 +519,15 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_update):
_LB_mock.amphorae = _amphorae_mock
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute(_LB_mock)
mock_driver.get_vrrp_interface.assert_called_once_with(_amphora_mock)
mock_driver.get_vrrp_interface.assert_called_once_with(
_amphora_mock, timeout_dict=timeout_dict)
# Test revert
mock_driver.reset_mock()
@ -550,3 +594,22 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_driver_tasks.AmphoraVRRPStart())
amphora_vrrp_start_obj.execute(_LB_mock)
mock_driver.start_vrrp_service.assert_called_once_with(_LB_mock)
def test_amphora_compute_connectivity_wait(self,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
amp_compute_conn_wait_obj = (
amphora_driver_tasks.AmphoraComputeConnectivityWait())
amp_compute_conn_wait_obj.execute(_amphora_mock)
mock_driver.get_info.assert_called_once_with(_amphora_mock)
mock_driver.get_info.side_effect = driver_except.TimeOutException()
self.assertRaises(driver_except.TimeOutException,
amp_compute_conn_wait_obj.execute, _amphora_mock)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)

View File

@ -339,7 +339,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock, None
computewait = compute_tasks.ComputeWait()
computewait = compute_tasks.ComputeActiveWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -366,7 +366,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock, None
computewait = compute_tasks.ComputeWait()
computewait = compute_tasks.ComputeActiveWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -391,7 +391,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock, None
computewait = compute_tasks.ComputeWait()
computewait = compute_tasks.ComputeActiveWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)

View File

@ -1735,6 +1735,59 @@ class TestDatabaseTasks(base.TestCase):
repo.AmphoraRepository.update.assert_called_once_with(
'TEST', AMP_ID, role=None, vrrp_priority=None)
@mock.patch('octavia.db.repositories.AmphoraRepository.get')
def test_get_amphorae_from_loadbalancer(self,
mock_amphora_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
amp1 = mock.MagicMock()
amp1.id = uuidutils.generate_uuid()
amp2 = mock.MagicMock()
amp2.id = uuidutils.generate_uuid()
lb = mock.MagicMock()
lb.amphorae = [amp1, amp2]
mock_amphora_get.side_effect = [_amphora_mock, None]
get_amps_from_lb_obj = database_tasks.GetAmphoraeFromLoadbalancer()
result = get_amps_from_lb_obj.execute(lb)
self.assertEqual([_amphora_mock], result)
@mock.patch('octavia.db.repositories.ListenerRepository.get')
def test_get_listeners_from_loadbalancer(self,
mock_listener_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mock_listener_get.return_value = _listener_mock
_loadbalancer_mock.listeners = [_listener_mock]
get_list_from_lb_obj = database_tasks.GetListenersFromLoadbalancer()
result = get_list_from_lb_obj.execute(_loadbalancer_mock)
mock_listener_get.assert_called_once_with('TEST', id=_listener_mock.id)
self.assertEqual([_listener_mock], result)
def test_get_vip_from_loadbalancer(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
_loadbalancer_mock.vip = _vip_mock
get_vip_from_lb_obj = database_tasks.GetVipFromLoadbalancer()
result = get_vip_from_lb_obj.execute(_loadbalancer_mock)
self.assertEqual(_vip_mock, result)
@mock.patch('octavia.db.repositories.VRRPGroupRepository.create')
def test_create_vrrp_group_for_lb(self,
mock_vrrp_group_create,

View File

@ -17,6 +17,7 @@ import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from taskflow.types import failure
from octavia.common import constants
from octavia.common import data_models as o_data_models
@ -296,6 +297,69 @@ class TestNetworkTasks(base.TestCase):
self.assertEqual([port_mock], ports)
def test_handle_network_delta(self, mock_get_net_driver):
mock_net_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_net_driver
nic1 = mock.MagicMock()
nic1.network_id = uuidutils.generate_uuid()
nic2 = mock.MagicMock()
nic2.network_id = uuidutils.generate_uuid()
interface1 = mock.MagicMock()
interface1.port_id = uuidutils.generate_uuid()
port1 = mock.MagicMock()
port1.network_id = uuidutils.generate_uuid()
fixed_ip = mock.MagicMock()
fixed_ip.subnet_id = uuidutils.generate_uuid()
port1.fixed_ips = [fixed_ip]
subnet = mock.MagicMock()
network = mock.MagicMock()
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[nic1],
delete_nics=[nic2, nic2, nic2])
mock_net_driver.plug_network.return_value = interface1
mock_net_driver.get_port.return_value = port1
mock_net_driver.get_network.return_value = network
mock_net_driver.get_subnet.return_value = subnet
mock_net_driver.unplug_network.side_effect = [
None, net_base.NetworkNotFound, Exception]
handle_net_delta_obj = network_tasks.HandleNetworkDelta()
result = handle_net_delta_obj.execute(self.amphora_mock, delta)
mock_net_driver.plug_network.assert_called_once_with(
self.amphora_mock.compute_id, nic1.network_id)
mock_net_driver.get_port.assert_called_once_with(interface1.port_id)
mock_net_driver.get_network.assert_called_once_with(port1.network_id)
mock_net_driver.get_subnet.assert_called_once_with(fixed_ip.subnet_id)
self.assertEqual({self.amphora_mock.id: [port1]}, result)
mock_net_driver.unplug_network.assert_called_with(
self.amphora_mock.compute_id, nic2.network_id)
# Revert
delta2 = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[nic1, nic1],
delete_nics=[nic2, nic2, nic2])
mock_net_driver.unplug_network.reset_mock()
handle_net_delta_obj.revert(
failure.Failure.from_exception(Exception('boom')), None, None)
mock_net_driver.unplug_network.assert_not_called()
mock_net_driver.unplug_network.reset_mock()
handle_net_delta_obj.revert(None, None, None)
mock_net_driver.unplug_network.assert_not_called()
mock_net_driver.unplug_network.reset_mock()
handle_net_delta_obj.revert(None, None, delta2)
def test_handle_network_deltas(self, mock_get_net_driver):
mock_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_driver

View File

@ -56,10 +56,20 @@ _l7rule_mock = mock.MagicMock()
_create_map_flow_mock = mock.MagicMock()
_amphora_mock.load_balancer_id = LB_ID
_amphora_mock.id = AMP_ID
_db_session = mock.MagicMock()
CONF = cfg.CONF
class TestException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
return_value=_amphora_mock)
@mock.patch('octavia.db.repositories.HealthMonitorRepository.get',
@ -79,7 +89,7 @@ CONF = cfg.CONF
@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine._taskflow_load',
return_value=_flow_mock)
@mock.patch('taskflow.listeners.logging.DynamicLoggingListener')
@mock.patch('octavia.db.api.get_session', return_value='TEST')
@mock.patch('octavia.db.api.get_session', return_value=_db_session)
class TestControllerWorker(base.TestCase):
def setUp(self):
@ -162,7 +172,7 @@ class TestControllerWorker(base.TestCase):
cw.delete_amphora(AMP_ID)
mock_amp_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -576,7 +586,7 @@ class TestControllerWorker(base.TestCase):
cw.delete_load_balancer(LB_ID, cascade=False)
mock_lb_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -614,7 +624,7 @@ class TestControllerWorker(base.TestCase):
cw.delete_load_balancer(LB_ID, cascade=True)
mock_lb_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -656,7 +666,7 @@ class TestControllerWorker(base.TestCase):
cw.update_load_balancer(LB_ID, change)
mock_lb_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -1137,9 +1147,81 @@ class TestControllerWorker(base.TestCase):
}))
_flow_mock.run.assert_called_once_with()
mock_update.assert_called_with('TEST', LB_ID,
mock_update.assert_called_with(_db_session, LB_ID,
provisioning_status=constants.ACTIVE)
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
'_perform_amphora_failover')
def test_failover_amp_missing_amp(self,
mock_perform_amp_failover,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
mock_amp_repo_get.return_value = None
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
mock_perform_amp_failover.assert_not_called()
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
'_perform_amphora_failover')
def test_failover_amp_flow_exception(self,
mock_perform_amp_failover,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
mock_perform_amp_failover.side_effect = TestException('boom')
cw = controller_worker.ControllerWorker()
self.assertRaises(TestException, cw.failover_amphora, AMP_ID)
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
'_perform_amphora_failover')
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_failover_amp_no_lb(self,
mock_lb_update,
mock_perform_amp_failover,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
amphora = mock.MagicMock()
amphora.load_balancer_id = None
mock_amp_repo_get.return_value = amphora
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
mock_lb_update.assert_not_called()
mock_perform_amp_failover.assert_called_once_with(
amphora, constants.LB_CREATE_FAILOVER_PRIORITY)
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.update')
def test_failover_deleted_amphora(self,
mock_update,
@ -1163,7 +1245,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw._perform_amphora_failover(mock_amphora, 10)
mock_update.assert_called_with('TEST', AMP_ID, busy=True)
mock_update.assert_called_with(_db_session, AMP_ID, busy=True)
mock_taskflow_load.assert_not_called()
@mock.patch('octavia.controller.worker.'
@ -1192,7 +1274,7 @@ class TestControllerWorker(base.TestCase):
cw.failover_loadbalancer('123')
mock_perform.assert_called_with(
_amphora_mock2, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
mock_update.assert_called_with('TEST', '123',
mock_update.assert_called_with(_db_session, '123',
provisioning_status=constants.ACTIVE)
mock_perform.reset
@ -1204,13 +1286,13 @@ class TestControllerWorker(base.TestCase):
# is the last one
mock_perform.assert_called_with(
_amphora_mock, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
mock_update.assert_called_with('TEST', '123',
mock_update.assert_called_with(_db_session, '123',
provisioning_status=constants.ACTIVE)
mock_perform.reset
mock_perform.side_effect = OverflowError()
self.assertRaises(OverflowError, cw.failover_loadbalancer, 123)
mock_update.assert_called_with('TEST', 123,
mock_update.assert_called_with(_db_session, 123,
provisioning_status=constants.ERROR)
@mock.patch('octavia.controller.worker.flows.'
@ -1255,7 +1337,7 @@ class TestControllerWorker(base.TestCase):
}))
_flow_mock.run.assert_called_once_with()
mock_update.assert_called_with('TEST', LB_ID,
mock_update.assert_called_with(_db_session, LB_ID,
provisioning_status=constants.ACTIVE)
@mock.patch('octavia.controller.worker.flows.'

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fixes an issue where if more than one amphora fails at the same time,
failover might not fully complete, leaving the load balancer in ERROR.

View File

@ -53,9 +53,12 @@ def generate(flow_list, output_directory):
get_flow_method = getattr(current_instance, current_tuple[2])
if (current_tuple[1] == 'AmphoraFlows' and
current_tuple[2] == 'get_failover_flow'):
amp1 = dmh.generate_amphora()
amp2 = dmh.generate_amphora()
lb = dmh.generate_load_balancer(amphorae=[amp1, amp2])
current_engine = engines.load(
get_flow_method(role=constants.ROLE_STANDALONE,
status=constants.AMPHORA_ALLOCATED))
load_balancer=lb))
elif (current_tuple[1] == 'LoadBalancerFlows' and
current_tuple[2] == 'get_create_load_balancer_flow'):
current_engine = engines.load(