Merge "Fix failover when multiple amphora have failed"

This commit is contained in:
Zuul 2018-07-27 23:22:30 +00:00 committed by Gerrit Code Review
commit fbefbcd843
23 changed files with 725 additions and 80 deletions

View File

@ -198,6 +198,12 @@
#
# rest_request_conn_timeout = 10
# rest_request_read_timeout = 60
#
# These "active" timeouts are used once the amphora should already
# be fully up and active. These values are lower than the other values to
# facilitate "fail fast" scenarios like failovers
# active_connection_max_retries = 15
# active_connection_rety_interval = 2
[controller_worker]
# workers = 1

View File

@ -21,6 +21,25 @@ import six
@six.add_metaclass(abc.ABCMeta)
class AmphoraLoadBalancerDriver(object):
@abc.abstractmethod
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
"""Update the amphora with a new configuration.
:param listeners: List of listeners to update.
:type listener: list
:param amphora_id: The ID of the amphora to update
:type amphora_id: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:returns: None
Builds a new configuration, pushes it to the amphora, and reloads
the listener on one amphora.
"""
pass
@abc.abstractmethod
def update(self, listener, vip):
"""Update the amphora with a new configuration.
@ -30,7 +49,7 @@ class AmphoraLoadBalancerDriver(object):
:type listener: object
:param vip: vip object, need to use its ip_address property
:type vip: object
:returns: return a value list (listener, vip, status flag--update)
:returns: None
At this moment, we just build the basic structure for testing, will
add more function along with the development.

View File

@ -60,6 +60,45 @@ class HaproxyAmphoraLoadBalancerDriver(
haproxy_template=CONF.haproxy_amphora.haproxy_template,
connection_logging=CONF.haproxy_amphora.connection_logging)
def update_amphora_listeners(self, listeners, amphora_index,
amphorae, timeout_dict=None):
"""Update the amphora with a new configuration.
:param listeners: List of listeners to update.
:type listener: list
:param amphora_id: The ID of the amphora to update
:type amphora_id: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:returns: None
Updates the configuration of the listeners on a single amphora.
"""
# if the amphora does not yet have listeners, no need to update them.
if not listeners:
LOG.debug('No listeners found to update.')
return
amp = amphorae[amphora_index]
if amp is None or amp.status == consts.DELETED:
return
# TODO(johnsom) remove when we don't have a process per listener
for listener in listeners:
LOG.debug("%s updating listener %s on amphora %s",
self.__class__.__name__, listener.id, amp.id)
certs = self._process_tls_certificates(listener)
# Generate HaProxy configuration from listener object
config = self.jinja.build_config(
host_amphora=amp,
listener=listener,
tls_cert=certs['tls_cert'],
user_group=CONF.haproxy_amphora.user_group)
self.client.upload_config(amp, listener.id, config,
timeout_dict=timeout_dict)
self.client.reload_listener(amp, listener.id,
timeout_dict=timeout_dict)
def update(self, listener, vip):
LOG.debug("Amphora %s haproxy, updating listener %s, vip %s",
self.__class__.__name__, listener.protocol_port,
@ -85,25 +124,29 @@ class HaproxyAmphoraLoadBalancerDriver(
self.__class__.__name__, amp.id)
self.client.update_cert_for_rotation(amp, pem)
def _apply(self, func, listener=None, *args):
for amp in listener.load_balancer.amphorae:
if amp.status != consts.DELETED:
func(amp, listener.id, *args)
def _apply(self, func, listener=None, amphora=None, *args):
if amphora is None:
for amp in listener.load_balancer.amphorae:
if amp.status != consts.DELETED:
func(amp, listener.id, *args)
else:
if amphora.status != consts.DELETED:
func(amphora, listener.id, *args)
def stop(self, listener, vip):
self._apply(self.client.stop_listener, listener)
def start(self, listener, vip):
self._apply(self.client.start_listener, listener)
def start(self, listener, vip, amphora=None):
self._apply(self.client.start_listener, listener, amphora)
def delete(self, listener, vip):
self._apply(self.client.delete_listener, listener)
def get_info(self, amphora):
self.driver.get_info(amphora.lb_network_ip)
return self.client.get_info(amphora)
def get_diagnostics(self, amphora):
self.driver.get_diagnostics(amphora.lb_network_ip)
pass
def finalize_amphora(self, amphora):
pass
@ -186,7 +229,7 @@ class HaproxyAmphoraLoadBalancerDriver(
pem = cert_parser.build_pem(cert)
md5 = hashlib.md5(pem).hexdigest() # nosec
name = '{id}.pem'.format(id=cert.id)
self._apply(self._upload_cert, listener, pem, md5, name)
self._apply(self._upload_cert, listener, None, pem, md5, name)
return {'tls_cert': tls_cert, 'sni_certs': sni_certs}
@ -252,17 +295,27 @@ class AmphoraAPIClient(object):
port=CONF.haproxy_amphora.bind_port,
version=API_VERSION)
def request(self, method, amp, path='/', **kwargs):
def request(self, method, amp, path='/', timeout_dict=None, **kwargs):
cfg_ha_amp = CONF.haproxy_amphora
if timeout_dict is None:
timeout_dict = {}
req_conn_timeout = timeout_dict.get(
consts.REQ_CONN_TIMEOUT, cfg_ha_amp.rest_request_conn_timeout)
req_read_timeout = timeout_dict.get(
consts.REQ_READ_TIMEOUT, cfg_ha_amp.rest_request_read_timeout)
conn_max_retries = timeout_dict.get(
consts.CONN_MAX_RETRIES, cfg_ha_amp.connection_max_retries)
conn_retry_interval = timeout_dict.get(
consts.CONN_RETRY_INTERVAL, cfg_ha_amp.connection_retry_interval)
LOG.debug("request url %s", path)
_request = getattr(self.session, method.lower())
_url = self._base_url(amp.lb_network_ip) + path
LOG.debug("request url %s", _url)
timeout_tuple = (CONF.haproxy_amphora.rest_request_conn_timeout,
CONF.haproxy_amphora.rest_request_read_timeout)
reqargs = {
'verify': CONF.haproxy_amphora.server_ca,
'url': _url,
'timeout': timeout_tuple, }
'timeout': (req_conn_timeout, req_read_timeout), }
reqargs.update(kwargs)
headers = reqargs.setdefault('headers', {})
@ -270,7 +323,7 @@ class AmphoraAPIClient(object):
self.ssl_adapter.uuid = amp.id
exception = None
# Keep retrying
for a in six.moves.xrange(CONF.haproxy_amphora.connection_max_retries):
for a in six.moves.xrange(conn_max_retries):
try:
with warnings.catch_warnings():
warnings.filterwarnings(
@ -304,20 +357,20 @@ class AmphoraAPIClient(object):
except (requests.ConnectionError, requests.Timeout) as e:
exception = e
LOG.warning("Could not connect to instance. Retrying.")
time.sleep(CONF.haproxy_amphora.connection_retry_interval)
time.sleep(conn_retry_interval)
LOG.error("Connection retries (currently set to %(max_retries)s) "
"exhausted. The amphora is unavailable. Reason: "
"%(exception)s",
{'max_retries': CONF.haproxy_amphora.connection_max_retries,
{'max_retries': conn_max_retries,
'exception': exception})
raise driver_except.TimeOutException()
def upload_config(self, amp, listener_id, config):
def upload_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(
amp,
'listeners/{amphora_id}/{listener_id}/haproxy'.format(
amphora_id=amp.id, listener_id=listener_id),
amphora_id=amp.id, listener_id=listener_id), timeout_dict,
data=config)
return exc.check_exception(r)
@ -329,9 +382,9 @@ class AmphoraAPIClient(object):
return r.json()
return None
def _action(self, action, amp, listener_id):
def _action(self, action, amp, listener_id, timeout_dict=None):
r = self.put(amp, 'listeners/{listener_id}/{action}'.format(
listener_id=listener_id, action=action))
listener_id=listener_id, action=action), timeout_dict=timeout_dict)
return exc.check_exception(r)
def upload_cert_pem(self, amp, listener_id, pem_filename, pem_file):
@ -404,8 +457,9 @@ class AmphoraAPIClient(object):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
return exc.check_exception(r)
def get_interface(self, amp, ip_addr):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr))
def get_interface(self, amp, ip_addr, timeout_dict=None):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()
return None

View File

@ -90,5 +90,6 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
self.client.reload_vrrp(amp)
def get_vrrp_interface(self, amphora):
return self.client.get_interface(amphora, amphora.vrrp_ip)['interface']
def get_vrrp_interface(self, amphora, timeout_dict=None):
return self.client.get_interface(
amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface']

View File

@ -37,6 +37,14 @@ class NoopManager(object):
super(NoopManager, self).__init__()
self.amphoraconfig = {}
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
for listener in listeners:
LOG.debug("Amphora noop driver update_amphora_listeners, "
"listener %s, amphora %s, timeouts %s", listener.id,
amphora_id, timeout_dict)
self.amphoraconfig[(listener.id, amphora_id)] = (
listener, amphora_id, timeout_dict, "update_amp")
def update(self, listener, vip):
LOG.debug("Amphora %s no-op, update listener %s, vip %s",
self.__class__.__name__, listener.protocol_port,
@ -106,6 +114,11 @@ class NoopAmphoraLoadBalancerDriver(
super(NoopAmphoraLoadBalancerDriver, self).__init__()
self.driver = NoopManager()
def update_amphora_listeners(self, listeners, amphora_id, timeout_dict):
self.driver.update_amphora_listeners(listeners, amphora_id,
timeout_dict)
def update(self, listener, vip):
self.driver.update(listener, vip)

View File

@ -240,6 +240,13 @@ haproxy_amphora_opts = [
default=5,
help=_('Retry timeout between connection attempts in '
'seconds.')),
cfg.IntOpt('active_connection_max_retries',
default=15,
help=_('Retry threshold for connecting to active amphorae.')),
cfg.IntOpt('active_connection_rety_interval',
default=2,
help=_('Retry timeout between connection attempts in '
'seconds for active amphora.')),
cfg.IntOpt('build_rate_limit',
default=-1,
help=_('Number of amphorae that could be built per controller'

View File

@ -195,6 +195,7 @@ FAILED_AMPHORA = 'failed_amphora'
FAILOVER_AMPHORA = 'failover_amphora'
AMPHORAE = 'amphorae'
AMPHORA_ID = 'amphora_id'
AMPHORA_INDEX = 'amphora_index'
FAILOVER_AMPHORA_ID = 'failover_amphora_id'
DELTA = 'delta'
DELTAS = 'deltas'
@ -247,6 +248,11 @@ MEMBER_UPDATES = 'member_updates'
HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
L7POLICY_UPDATES = 'l7policy_updates'
L7RULE_UPDATES = 'l7rule_updates'
TIMEOUT_DICT = 'timeout_dict'
REQ_CONN_TIMEOUT = 'req_conn_timeout'
REQ_READ_TIMEOUT = 'req_read_timeout'
CONN_MAX_RETRIES = 'conn_max_retries'
CONN_RETRY_INTERVAL = 'conn_retry_interval'
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
@ -280,6 +286,7 @@ UPDATE_MEMBER_FLOW = 'octavia-update-member-flow'
UPDATE_POOL_FLOW = 'octavia-update-pool-flow'
UPDATE_L7POLICY_FLOW = 'octavia-update-l7policy-flow'
UPDATE_L7RULE_FLOW = 'octavia-update-l7rule-flow'
UPDATE_AMPS_SUBFLOW = 'octavia-update-amps-subflow'
POST_MAP_AMP_TO_LB_SUBFLOW = 'octavia-post-map-amp-to-lb-subflow'
CREATE_AMP_FOR_LB_SUBFLOW = 'octavia-create-amp-for-lb-subflow'
@ -313,6 +320,8 @@ AMP_VRRP_STOP = 'octavia-amphora-vrrp-stop'
AMP_UPDATE_VRRP_INTF = 'octavia-amphora-update-vrrp-intf'
CREATE_VRRP_GROUP_FOR_LB = 'octavia-create-vrrp-group-for-lb'
CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules'
AMP_COMPUTE_CONNECTIVITY_WAIT = 'octavia-amp-compute-connectivity-wait'
AMP_LISTENER_UPDATE = 'octavia-amp-listeners-update'
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'

View File

@ -828,15 +828,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# if we run with anti-affinity we need to set the server group
# as well
if CONF.nova.enable_anti_affinity:
lb = self._amphora_repo.get_lb_for_amphora(
db_apis.get_session(), amp.id)
if lb:
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
lb = self._amphora_repo.get_lb_for_amphora(
db_apis.get_session(), amp.id)
if CONF.nova.enable_anti_affinity and lb:
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(
role=amp.role, load_balancer_id=amp.load_balancer_id),
role=amp.role, load_balancer=lb),
store=stored_params)
with tf_logging.DynamicLoggingListener(

View File

@ -16,6 +16,7 @@
from oslo_config import cfg
from taskflow.patterns import graph_flow
from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
@ -62,12 +63,15 @@ class AmphoraFlows(object):
provides=constants.COMPUTE_ID))
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amphora_flow.add(compute_tasks.ComputeWait(
create_amphora_flow.add(compute_tasks.ComputeActiveWait(
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amphora_flow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
requires=constants.AMPHORA))
create_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
@ -172,7 +176,7 @@ class AmphoraFlows(object):
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBootingInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_BOOTING_INDB,
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amp_for_lb_subflow.add(compute_tasks.ComputeWait(
create_amp_for_lb_subflow.add(compute_tasks.ComputeActiveWait(
name=sf_name + '-' + constants.COMPUTE_WAIT,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
@ -180,6 +184,10 @@ class AmphoraFlows(object):
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amp_for_lb_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT,
requires=constants.AMPHORA))
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA))
@ -290,7 +298,7 @@ class AmphoraFlows(object):
return delete_amphora_flow
def get_failover_flow(self, role=constants.ROLE_STANDALONE,
load_balancer_id=None):
load_balancer=None):
"""Creates a flow to failover a stale amphora
:returns: The flow for amphora failover
@ -334,7 +342,7 @@ class AmphoraFlows(object):
requires=constants.AMPHORA))
# If this is an unallocated amp (spares pool), we're done
if not load_balancer_id:
if not load_balancer:
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
@ -373,9 +381,38 @@ class AmphoraFlows(object):
provides=constants.AMPHORAE_NETWORK_CONFIG))
failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.LISTENERS))
failover_amphora_flow.add(database_tasks.GetAmphoraeFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.AMPHORAE))
failover_amphora_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=(constants.LOADBALANCER, constants.LISTENERS)))
# Listeners update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an
# unordered subflow.
update_amps_subflow = unordered_flow.Flow(
constants.UPDATE_AMPS_SUBFLOW)
timeout_dict = {
constants.CONN_MAX_RETRIES:
CONF.haproxy_amphora.active_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_rety_interval}
# Setup parallel flows for each amp. We don't know the new amp
# details at flow creation time, so setup a subflow for each
# amp on the LB, they let the task index into a list of amps
# to find the amphora it should work on.
amp_index = 0
for amp in load_balancer.amphorae:
if amp.status == constants.DELETED:
continue
update_amps_subflow.add(
amphora_driver_tasks.AmpListenersUpdate(
name=constants.AMP_LISTENER_UPDATE + '-' + str(amp_index),
requires=(constants.LISTENERS, constants.AMPHORAE),
inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict}))
amp_index += 1
failover_amphora_flow.add(update_amps_subflow)
# Plug the VIP ports into the new amphora
failover_amphora_flow.add(network_tasks.PlugVIPPort(
@ -385,13 +422,22 @@ class AmphoraFlows(object):
constants.AMPHORAE_NETWORK_CONFIG)))
# Plug the member networks into the new amphora
failover_amphora_flow.add(network_tasks.CalculateDelta(
requires=constants.LOADBALANCER, provides=constants.DELTAS))
failover_amphora_flow.add(network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
requires=(constants.LOADBALANCER, constants.AMPHORA),
provides=constants.DELTA))
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
requires=(constants.AMPHORA, constants.DELTA),
provides=constants.ADDED_PORTS))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraePostNetworkPlug(
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
failover_amphora_flow.add(database_tasks.ReloadLoadBalancer(
name='octavia-failover-LB-reload-2',
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
# Handle the amphora role and VRRP if necessary
if role == constants.ROLE_MASTER:
failover_amphora_flow.add(database_tasks.MarkAmphoraMasterInDB(
@ -412,7 +458,8 @@ class AmphoraFlows(object):
requires=constants.AMPHORA))
failover_amphora_flow.add(amphora_driver_tasks.ListenersStart(
requires=(constants.LOADBALANCER, constants.LISTENERS)))
requires=(constants.LOADBALANCER, constants.LISTENERS,
constants.AMPHORA)))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},

View File

@ -20,6 +20,7 @@ from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.common import constants
from octavia.controller.worker import task_utils as task_utilities
from octavia.db import api as db_apis
@ -45,6 +46,25 @@ class BaseAmphoraTask(task.Task):
self.task_utils = task_utilities.TaskUtils()
class AmpListenersUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora."""
def execute(self, listeners, amphora_index, amphorae, timeout_dict=()):
# Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let
# health manager fix it.
try:
self.amphora_driver.update_amphora_listeners(
listeners, amphora_index, amphorae, timeout_dict)
except Exception as e:
amphora_id = amphorae[amphora_index].id
LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s',
amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
class ListenersUpdate(BaseAmphoraTask):
"""Task to update amphora with all specified listeners' configurations."""
@ -104,10 +124,10 @@ class ListenerStart(BaseAmphoraTask):
class ListenersStart(BaseAmphoraTask):
"""Task to start all listeners on the vip."""
def execute(self, loadbalancer, listeners):
def execute(self, loadbalancer, listeners, amphora=None):
"""Execute listener start routines for listeners on an amphora."""
for listener in listeners:
self.amphora_driver.start(listener, loadbalancer.vip)
self.amphora_driver.start(listener, loadbalancer.vip, amphora)
LOG.debug("Started the listeners on the vip")
def revert(self, listeners, *args, **kwargs):
@ -261,11 +281,27 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
def execute(self, loadbalancer):
"""Execute post_vip_routine."""
amps = []
timeout_dict = {
constants.CONN_MAX_RETRIES:
CONF.haproxy_amphora.active_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_rety_interval}
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
# Currently this is supported only with REST Driver
interface = self.amphora_driver.get_vrrp_interface(amp)
try:
interface = self.amphora_driver.get_vrrp_interface(
amp, timeout_dict=timeout_dict)
except Exception as e:
# This can occur when an active/standby LB has no listener
LOG.error('Failed to get amphora VRRP interface on amphora '
'%s. Skipping this amphora as it is failing due to: '
'%s', amp.id, str(e))
self.amphora_repo.update(db_apis.get_session(), amp.id,
status=constants.ERROR)
continue
self.amphora_repo.update(db_apis.get_session(), amp.id,
vrrp_interface=interface)
amps.append(self.amphora_repo.get(db_apis.get_session(),
@ -317,3 +353,22 @@ class AmphoraVRRPStart(BaseAmphoraTask):
self.amphora_driver.start_vrrp_service(loadbalancer)
LOG.debug("Started VRRP of loadbalancer %s amphorae",
loadbalancer.id)
class AmphoraComputeConnectivityWait(BaseAmphoraTask):
""""Task to wait for the compute instance to be up."""
def execute(self, amphora):
"""Execute get_info routine for an amphora until it responds."""
try:
amp_info = self.amphora_driver.get_info(amphora)
LOG.debug('Successfuly connected to amphora %s: %s',
amphora.id, amp_info)
except driver_except.TimeOutException:
LOG.error("Amphora compute instance failed to become reachable. "
"This either means the compute driver failed to fully "
"boot the instance inside the timeout interval or the "
"instance is not reachable via the lb-mgmt-net.")
self.amphora_repo.update(db_apis.get_session(), amphora.id,
status=constants.ERROR)
raise

View File

@ -175,7 +175,7 @@ class ComputeDelete(BaseComputeTask):
raise
class ComputeWait(BaseComputeTask):
class ComputeActiveWait(BaseComputeTask):
"""Wait for the compute driver to mark the amphora active."""
def execute(self, compute_id, amphora_id):

View File

@ -1525,6 +1525,25 @@ class GetAmphoraDetails(BaseDatabaseTask):
vrrp_priority=amphora.vrrp_priority)
class GetAmphoraeFromLoadbalancer(BaseDatabaseTask):
"""Task to pull the listeners from a loadbalancer."""
def execute(self, loadbalancer):
"""Pull the amphorae from a loadbalancer.
:param loadbalancer: Load balancer which listeners are required
:returns: A list of Listener objects
"""
amphorae = []
for amp in loadbalancer.amphorae:
a = self.amphora_repo.get(db_apis.get_session(), id=amp.id,
show_deleted=False)
if a is None:
continue
amphorae.append(a)
return amphorae
class GetListenersFromLoadbalancer(BaseDatabaseTask):
"""Task to pull the listeners from a loadbalancer."""
@ -1537,6 +1556,7 @@ class GetListenersFromLoadbalancer(BaseDatabaseTask):
listeners = []
for listener in loadbalancer.listeners:
l = self.listener_repo.get(db_apis.get_session(), id=listener.id)
l.load_balancer = loadbalancer
listeners.append(l)
return listeners

View File

@ -215,6 +215,55 @@ class GetMemberPorts(BaseNetworkTask):
return member_ports
class HandleNetworkDelta(BaseNetworkTask):
"""Task to plug and unplug networks
Plug or unplug networks based on delta
"""
def execute(self, amphora, delta):
"""Handle network plugging based off deltas."""
added_ports = {}
added_ports[amphora.id] = []
for nic in delta.add_nics:
interface = self.network_driver.plug_network(delta.compute_id,
nic.network_id)
port = self.network_driver.get_port(interface.port_id)
port.network = self.network_driver.get_network(port.network_id)
for fixed_ip in port.fixed_ips:
fixed_ip.subnet = self.network_driver.get_subnet(
fixed_ip.subnet_id)
added_ports[amphora.id].append(port)
for nic in delta.delete_nics:
try:
self.network_driver.unplug_network(delta.compute_id,
nic.network_id)
except base.NetworkNotFound:
LOG.debug("Network %d not found ", nic.network_id)
except Exception:
LOG.exception("Unable to unplug network")
return added_ports
def revert(self, result, amphora, delta, *args, **kwargs):
"""Handle a network plug or unplug failures."""
if isinstance(result, failure.Failure):
return
if not delta:
return
LOG.warning("Unable to plug networks for amp id %s",
delta.amphora_id)
for nic in delta.add_nics:
try:
self.network_driver.unplug_network(delta.compute_id,
nic.network_id)
except Exception:
pass
class HandleNetworkDeltas(BaseNetworkTask):
"""Task to plug and unplug networks

View File

@ -24,6 +24,7 @@ import six
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.amphorae.drivers.haproxy import exceptions as exc
from octavia.amphorae.drivers.haproxy import rest_api_driver as driver
from octavia.common import constants
from octavia.db import models
from octavia.network import data_models as network_models
from octavia.tests.unit import base
@ -47,6 +48,9 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
def setUp(self):
super(TestHaproxyAmphoraLoadBalancerDriverTest, self).setUp()
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="haproxy_amphora", user_group="everyone")
DEST1 = '198.51.100.0/24'
DEST2 = '203.0.113.0/24'
NEXTHOP = '192.0.2.1'
@ -84,6 +88,50 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
'mtu': FAKE_MTU,
'host_routes': host_routes_data}
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test_update_amphora_listeners(self, mock_load_cert):
mock_amphora = mock.MagicMock()
mock_amphora.id = uuidutils.generate_uuid()
mock_listener = mock.MagicMock()
mock_listener.id = uuidutils.generate_uuid()
mock_load_cert.return_value = {'tls_cert': None, 'sni_certs': []}
self.driver.jinja.build_config.return_value = 'the_config'
self.driver.update_amphora_listeners(None, 1, [],
self.timeout_dict)
mock_load_cert.assert_not_called()
self.driver.jinja.build_config.assert_not_called()
self.driver.client.upload_config.assert_not_called()
self.driver.client.reload_listener.assert_not_called()
self.driver.update_amphora_listeners([mock_listener], 0,
[mock_amphora], self.timeout_dict)
self.driver.jinja.build_config.assert_called_once_with(
host_amphora=mock_amphora, listener=mock_listener,
tls_cert=None, user_group="everyone")
self.driver.client.upload_config.assert_called_once_with(
mock_amphora, mock_listener.id, 'the_config',
timeout_dict=self.timeout_dict)
self.driver.client.reload_listener(mock_amphora, mock_listener.id,
timeout_dict=self.timeout_dict)
mock_load_cert.reset_mock()
self.driver.jinja.build_config.reset_mock()
self.driver.client.upload_config.reset_mock()
self.driver.client.reload_listener.reset_mock()
mock_amphora.status = constants.DELETED
self.driver.update_amphora_listeners([mock_listener], 0,
[mock_amphora], self.timeout_dict)
mock_load_cert.assert_not_called()
self.driver.jinja.build_config.assert_not_called()
self.driver.client.upload_config.assert_not_called()
self.driver.client.reload_listener.assert_not_called()
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
@mock.patch('octavia.common.tls_utils.cert_parser.get_host_names')
def test_update(self, mock_cert, mock_load_crt):
@ -158,11 +206,29 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.amp, self.sl.id)
def test_start(self):
amp1 = mock.MagicMock()
amp2 = mock.MagicMock()
amp2.status = constants.DELETED
listener = mock.MagicMock()
listener.id = uuidutils.generate_uuid()
listener.load_balancer.amphorae = [amp1, amp2]
# Execute driver method
self.driver.start(self.sl, self.sv)
self.driver.start(listener, self.sv)
self.driver.client.start_listener.assert_called_once_with(
amp1, listener.id)
def test_start_with_amphora(self):
# Execute driver method
amp = mock.MagicMock()
self.driver.start(self.sl, self.sv, self.amp)
self.driver.client.start_listener.assert_called_once_with(
self.amp, self.sl.id)
self.driver.client.start_listener.reset_mock()
amp.status = constants.DELETED
self.driver.start(self.sl, self.sv, amp)
self.driver.client.start_listener.assert_not_called()
def test_delete(self):
# Execute driver method
self.driver.delete(self.sl, self.sv)
@ -170,13 +236,19 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.amp, self.sl.id)
def test_get_info(self):
pass
self.driver.client.get_info.return_value = 'FAKE_INFO'
result = self.driver.get_info(self.amp)
self.assertEqual('FAKE_INFO', result)
def test_get_diagnostics(self):
pass
# TODO(johnsom) Implement once this exists on the amphora agent.
result = self.driver.get_diagnostics(self.amp)
self.assertIsNone(result)
def test_finalize_amphora(self):
pass
# TODO(johnsom) Implement once this exists on the amphora agent.
result = self.driver.finalize_amphora(self.amp)
self.assertIsNone(result)
def test_post_vip_plug(self):
amphorae_network_config = mock.MagicMock()
@ -250,7 +322,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
def test_get_vrrp_interface(self):
self.driver.get_vrrp_interface(self.amp)
self.driver.client.get_interface.assert_called_once_with(
self.amp, self.amp.vrrp_ip)
self.amp, self.amp.vrrp_ip, timeout_dict=None)
class TestAmphoraAPIClientTest(base.TestCase):
@ -271,6 +343,10 @@ class TestAmphoraAPIClientTest(base.TestCase):
'vrrp_ip': self.amp.vrrp_ip}
patcher = mock.patch('time.sleep').start()
self.addCleanup(patcher.stop)
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
def test_base_url(self):
url = self.driver._base_url(FAKE_IP)
@ -284,8 +360,8 @@ class TestAmphoraAPIClientTest(base.TestCase):
@mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.time.sleep')
def test_request(self, mock_sleep, mock_get):
self.assertRaises(driver_except.TimeOutException,
self.driver.request,
'get', self.amp, 'unavailableURL')
self.driver.request, 'get', self.amp,
'unavailableURL', self.timeout_dict)
@requests_mock.mock()
def test_get_info(self, m):

View File

@ -15,6 +15,7 @@
from oslo_utils import uuidutils
from octavia.amphorae.drivers.noop_driver import driver
from octavia.common import constants
from octavia.common import data_models
from octavia.network import data_models as network_models
from octavia.tests.unit import base
@ -44,6 +45,7 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
super(TestNoopAmphoraLoadBalancerDriver, self).setUp()
self.driver = driver.NoopAmphoraLoadBalancerDriver()
self.listener = data_models.Listener()
self.listener.id = uuidutils.generate_uuid()
self.listener.protocol_port = 80
self.vip = data_models.Vip()
self.vip.ip_address = "10.0.0.1"
@ -61,6 +63,19 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
vip_subnet=network_models.Subnet(id=self.FAKE_UUID_1))
}
self.pem_file = 'test_pem_file'
self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
def test_update_amphora_listeners(self):
self.driver.update_amphora_listeners([self.listener], self.amphora.id,
self.timeout_dict)
self.assertEqual((self.listener, self.amphora.id, self.timeout_dict,
'update_amp'),
self.driver.driver.amphoraconfig[(
self.listener.id,
self.amphora.id)])
def test_update(self):
self.driver.update(self.listener, self.vip)

View File

@ -19,6 +19,7 @@ from oslo_config import fixture as oslo_fixture
from taskflow.patterns import linear_flow as flow
from octavia.common import constants
from octavia.common import data_models
from octavia.controller.worker.flows import amphora_flows
import octavia.tests.unit.base as base
@ -38,6 +39,11 @@ class TestAmphoraFlows(base.TestCase):
amphora_driver='amphora_haproxy_rest_driver')
self.conf.config(group="nova", enable_anti_affinity=False)
self.AmpFlow = amphora_flows.AmphoraFlows()
self.amp1 = data_models.Amphora(id=1)
self.amp2 = data_models.Amphora(id=2)
self.amp3 = data_models.Amphora(id=3, status=constants.DELETED)
self.lb = data_models.LoadBalancer(
id=4, amphorae=[self.amp1, self.amp2, self.amp3])
def test_get_create_amphora_flow(self, mock_get_net_driver):
@ -237,7 +243,7 @@ class TestAmphoraFlows(base.TestCase):
def test_get_failover_flow_allocated(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_flow(
load_balancer_id='mylb')
load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -254,10 +260,10 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role=constants.ROLE_MASTER, load_balancer_id='mylb')
role=constants.ROLE_MASTER, load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -274,10 +280,10 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role=constants.ROLE_BACKUP, load_balancer_id='mylb')
role=constants.ROLE_BACKUP, load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -294,10 +300,10 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(
role='BOGUSROLE', load_balancer_id='mylb')
role='BOGUSROLE', load_balancer=self.lb)
self.assertIsInstance(amp_flow, flow.Flow)
@ -314,12 +320,11 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(3, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
self.assertEqual(12, len(amp_flow.provides))
def test_get_failover_flow_spare(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_flow(
load_balancer_id=None)
amp_flow = self.AmpFlow.get_failover_flow()
self.assertIsInstance(amp_flow, flow.Flow)

View File

@ -14,9 +14,12 @@
#
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from taskflow.types import failure
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.common import constants
from octavia.common import data_models
from octavia.controller.worker.tasks import amphora_driver_tasks
@ -28,6 +31,8 @@ AMP_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
LISTENER_ID = uuidutils.generate_uuid()
LB_ID = uuidutils.generate_uuid()
CONN_MAX_RETRIES = 10
CONN_RETRY_INTERVAL = 6
_amphora_mock = mock.MagicMock()
_amphora_mock.id = AMP_ID
@ -61,8 +66,42 @@ class TestAmphoraDriverTasks(base.TestCase):
_LB_mock.amphorae = [_amphora_mock]
_LB_mock.id = LB_ID
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="haproxy_amphora",
active_connection_max_retries=CONN_MAX_RETRIES)
conf.config(group="haproxy_amphora",
active_connection_rety_interval=CONN_RETRY_INTERVAL)
super(TestAmphoraDriverTasks, self).setUp()
def test_amp_listener_update(self,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
timeout_dict = {constants.REQ_CONN_TIMEOUT: 1,
constants.REQ_READ_TIMEOUT: 2,
constants.CONN_MAX_RETRIES: 3,
constants.CONN_RETRY_INTERVAL: 4}
amp_list_update_obj = amphora_driver_tasks.AmpListenersUpdate()
amp_list_update_obj.execute([_listener_mock], 0,
[_amphora_mock], timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with(
[_listener_mock], 0, [_amphora_mock], timeout_dict)
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute([_listener_mock], 0,
[_amphora_mock], timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)
def test_listener_update(self,
mock_driver,
mock_generate_uuid,
@ -480,10 +519,15 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_update):
_LB_mock.amphorae = _amphorae_mock
timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES,
constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL}
amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute(_LB_mock)
mock_driver.get_vrrp_interface.assert_called_once_with(_amphora_mock)
mock_driver.get_vrrp_interface.assert_called_once_with(
_amphora_mock, timeout_dict=timeout_dict)
# Test revert
mock_driver.reset_mock()
@ -550,3 +594,22 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_driver_tasks.AmphoraVRRPStart())
amphora_vrrp_start_obj.execute(_LB_mock)
mock_driver.start_vrrp_service.assert_called_once_with(_LB_mock)
def test_amphora_compute_connectivity_wait(self,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
amp_compute_conn_wait_obj = (
amphora_driver_tasks.AmphoraComputeConnectivityWait())
amp_compute_conn_wait_obj.execute(_amphora_mock)
mock_driver.get_info.assert_called_once_with(_amphora_mock)
mock_driver.get_info.side_effect = driver_except.TimeOutException()
self.assertRaises(driver_except.TimeOutException,
amp_compute_conn_wait_obj.execute, _amphora_mock)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)

View File

@ -339,7 +339,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock, None
computewait = compute_tasks.ComputeWait()
computewait = compute_tasks.ComputeActiveWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -366,7 +366,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock, None
computewait = compute_tasks.ComputeWait()
computewait = compute_tasks.ComputeActiveWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
@ -391,7 +391,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock, None
computewait = compute_tasks.ComputeWait()
computewait = compute_tasks.ComputeActiveWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)

View File

@ -1766,6 +1766,59 @@ class TestDatabaseTasks(base.TestCase):
repo.AmphoraRepository.update.assert_called_once_with(
'TEST', AMP_ID, role=None, vrrp_priority=None)
@mock.patch('octavia.db.repositories.AmphoraRepository.get')
def test_get_amphorae_from_loadbalancer(self,
mock_amphora_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
amp1 = mock.MagicMock()
amp1.id = uuidutils.generate_uuid()
amp2 = mock.MagicMock()
amp2.id = uuidutils.generate_uuid()
lb = mock.MagicMock()
lb.amphorae = [amp1, amp2]
mock_amphora_get.side_effect = [_amphora_mock, None]
get_amps_from_lb_obj = database_tasks.GetAmphoraeFromLoadbalancer()
result = get_amps_from_lb_obj.execute(lb)
self.assertEqual([_amphora_mock], result)
@mock.patch('octavia.db.repositories.ListenerRepository.get')
def test_get_listeners_from_loadbalancer(self,
mock_listener_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mock_listener_get.return_value = _listener_mock
_loadbalancer_mock.listeners = [_listener_mock]
get_list_from_lb_obj = database_tasks.GetListenersFromLoadbalancer()
result = get_list_from_lb_obj.execute(_loadbalancer_mock)
mock_listener_get.assert_called_once_with('TEST', id=_listener_mock.id)
self.assertEqual([_listener_mock], result)
def test_get_vip_from_loadbalancer(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
_loadbalancer_mock.vip = _vip_mock
get_vip_from_lb_obj = database_tasks.GetVipFromLoadbalancer()
result = get_vip_from_lb_obj.execute(_loadbalancer_mock)
self.assertEqual(_vip_mock, result)
@mock.patch('octavia.db.repositories.VRRPGroupRepository.create')
def test_create_vrrp_group_for_lb(self,
mock_vrrp_group_create,

View File

@ -17,6 +17,7 @@ import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from taskflow.types import failure
from octavia.common import constants
from octavia.common import data_models as o_data_models
@ -296,6 +297,69 @@ class TestNetworkTasks(base.TestCase):
self.assertEqual([port_mock], ports)
def test_handle_network_delta(self, mock_get_net_driver):
mock_net_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_net_driver
nic1 = mock.MagicMock()
nic1.network_id = uuidutils.generate_uuid()
nic2 = mock.MagicMock()
nic2.network_id = uuidutils.generate_uuid()
interface1 = mock.MagicMock()
interface1.port_id = uuidutils.generate_uuid()
port1 = mock.MagicMock()
port1.network_id = uuidutils.generate_uuid()
fixed_ip = mock.MagicMock()
fixed_ip.subnet_id = uuidutils.generate_uuid()
port1.fixed_ips = [fixed_ip]
subnet = mock.MagicMock()
network = mock.MagicMock()
delta = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[nic1],
delete_nics=[nic2, nic2, nic2])
mock_net_driver.plug_network.return_value = interface1
mock_net_driver.get_port.return_value = port1
mock_net_driver.get_network.return_value = network
mock_net_driver.get_subnet.return_value = subnet
mock_net_driver.unplug_network.side_effect = [
None, net_base.NetworkNotFound, Exception]
handle_net_delta_obj = network_tasks.HandleNetworkDelta()
result = handle_net_delta_obj.execute(self.amphora_mock, delta)
mock_net_driver.plug_network.assert_called_once_with(
self.amphora_mock.compute_id, nic1.network_id)
mock_net_driver.get_port.assert_called_once_with(interface1.port_id)
mock_net_driver.get_network.assert_called_once_with(port1.network_id)
mock_net_driver.get_subnet.assert_called_once_with(fixed_ip.subnet_id)
self.assertEqual({self.amphora_mock.id: [port1]}, result)
mock_net_driver.unplug_network.assert_called_with(
self.amphora_mock.compute_id, nic2.network_id)
# Revert
delta2 = data_models.Delta(amphora_id=self.amphora_mock.id,
compute_id=self.amphora_mock.compute_id,
add_nics=[nic1, nic1],
delete_nics=[nic2, nic2, nic2])
mock_net_driver.unplug_network.reset_mock()
handle_net_delta_obj.revert(
failure.Failure.from_exception(Exception('boom')), None, None)
mock_net_driver.unplug_network.assert_not_called()
mock_net_driver.unplug_network.reset_mock()
handle_net_delta_obj.revert(None, None, None)
mock_net_driver.unplug_network.assert_not_called()
mock_net_driver.unplug_network.reset_mock()
handle_net_delta_obj.revert(None, None, delta2)
def test_handle_network_deltas(self, mock_get_net_driver):
mock_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_driver

View File

@ -56,10 +56,20 @@ _l7rule_mock = mock.MagicMock()
_create_map_flow_mock = mock.MagicMock()
_amphora_mock.load_balancer_id = LB_ID
_amphora_mock.id = AMP_ID
_db_session = mock.MagicMock()
CONF = cfg.CONF
class TestException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
return_value=_amphora_mock)
@mock.patch('octavia.db.repositories.HealthMonitorRepository.get',
@ -79,7 +89,7 @@ CONF = cfg.CONF
@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine._taskflow_load',
return_value=_flow_mock)
@mock.patch('taskflow.listeners.logging.DynamicLoggingListener')
@mock.patch('octavia.db.api.get_session', return_value='TEST')
@mock.patch('octavia.db.api.get_session', return_value=_db_session)
class TestControllerWorker(base.TestCase):
def setUp(self):
@ -162,7 +172,7 @@ class TestControllerWorker(base.TestCase):
cw.delete_amphora(AMP_ID)
mock_amp_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -585,7 +595,7 @@ class TestControllerWorker(base.TestCase):
cw.delete_load_balancer(LB_ID, cascade=False)
mock_lb_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -623,7 +633,7 @@ class TestControllerWorker(base.TestCase):
cw.delete_load_balancer(LB_ID, cascade=True)
mock_lb_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -666,7 +676,7 @@ class TestControllerWorker(base.TestCase):
cw.update_load_balancer(LB_ID, change)
mock_lb_repo_get.assert_called_once_with(
'TEST',
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
@ -1159,9 +1169,81 @@ class TestControllerWorker(base.TestCase):
}))
_flow_mock.run.assert_called_once_with()
mock_update.assert_called_with('TEST', LB_ID,
mock_update.assert_called_with(_db_session, LB_ID,
provisioning_status=constants.ACTIVE)
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
'_perform_amphora_failover')
def test_failover_amp_missing_amp(self,
mock_perform_amp_failover,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
mock_amp_repo_get.return_value = None
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
mock_perform_amp_failover.assert_not_called()
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
'_perform_amphora_failover')
def test_failover_amp_flow_exception(self,
mock_perform_amp_failover,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
mock_perform_amp_failover.side_effect = TestException('boom')
cw = controller_worker.ControllerWorker()
self.assertRaises(TestException, cw.failover_amphora, AMP_ID)
@mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.'
'_perform_amphora_failover')
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_failover_amp_no_lb(self,
mock_lb_update,
mock_perform_amp_failover,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
amphora = mock.MagicMock()
amphora.load_balancer_id = None
mock_amp_repo_get.return_value = amphora
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
mock_lb_update.assert_not_called()
mock_perform_amp_failover.assert_called_once_with(
amphora, constants.LB_CREATE_FAILOVER_PRIORITY)
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete')
def test_failover_deleted_amphora(self,
mock_delete,
@ -1185,7 +1267,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw._perform_amphora_failover(mock_amphora, 10)
mock_delete.assert_called_with('TEST', amphora_id=AMP_ID)
mock_delete.assert_called_with(_db_session, amphora_id=AMP_ID)
mock_taskflow_load.assert_not_called()
@mock.patch('octavia.controller.worker.'
@ -1214,7 +1296,7 @@ class TestControllerWorker(base.TestCase):
cw.failover_loadbalancer('123')
mock_perform.assert_called_with(
_amphora_mock2, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
mock_update.assert_called_with('TEST', '123',
mock_update.assert_called_with(_db_session, '123',
provisioning_status=constants.ACTIVE)
mock_perform.reset
@ -1226,13 +1308,13 @@ class TestControllerWorker(base.TestCase):
# is the last one
mock_perform.assert_called_with(
_amphora_mock, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
mock_update.assert_called_with('TEST', '123',
mock_update.assert_called_with(_db_session, '123',
provisioning_status=constants.ACTIVE)
mock_perform.reset
mock_perform.side_effect = OverflowError()
self.assertRaises(OverflowError, cw.failover_loadbalancer, 123)
mock_update.assert_called_with('TEST', 123,
mock_update.assert_called_with(_db_session, 123,
provisioning_status=constants.ERROR)
@mock.patch('octavia.controller.worker.flows.'
@ -1277,7 +1359,7 @@ class TestControllerWorker(base.TestCase):
}))
_flow_mock.run.assert_called_once_with()
mock_update.assert_called_with('TEST', LB_ID,
mock_update.assert_called_with(_db_session, LB_ID,
provisioning_status=constants.ACTIVE)
@mock.patch('octavia.controller.worker.flows.'

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fixes an issue where if more than one amphora fails at the same time,
failover might not fully complete, leaving the load balancer in ERROR.

View File

@ -53,9 +53,12 @@ def generate(flow_list, output_directory):
get_flow_method = getattr(current_instance, current_tuple[2])
if (current_tuple[1] == 'AmphoraFlows' and
current_tuple[2] == 'get_failover_flow'):
amp1 = dmh.generate_amphora()
amp2 = dmh.generate_amphora()
lb = dmh.generate_load_balancer(amphorae=[amp1, amp2])
current_engine = engines.load(
get_flow_method(role=constants.ROLE_STANDALONE,
load_balancer_id=None))
load_balancer=lb))
elif (current_tuple[1] == 'LoadBalancerFlows' and
current_tuple[2] == 'get_create_load_balancer_flow'):
current_engine = engines.load(