Adding amphora failover flows

Added a flow to complete automated failover for an amphora.  Added new tasks
to retrieve ports, change port device ids for re-allocating to new amphora,
and added functionality to include existing ports on amphora creation.

Co-Authored-By: Brandon Logan <brandon.logan@rackspace.com>
Co-Authored-By: Michael Johnson <johnsomor@gmail.com>

Change-Id: Ic0d3f2b9a48ebf66df78e1ef98aef92ad7ee5057
This commit is contained in:
Trevor Vardeman 2015-07-15 16:52:12 -07:00 committed by Carlos Garza
parent 3b23de32b8
commit 2cfcf3eff1
23 changed files with 606 additions and 157 deletions

View File

@ -26,6 +26,7 @@ from octavia.amphorae.drivers import driver_base as driver_base
from octavia.amphorae.drivers.haproxy import exceptions as exc
from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg
from octavia.common.config import cfg
from octavia.common import constants
from octavia.common import data_models as data_models
from octavia.common.tls_utils import cert_parser
from octavia.i18n import _LW
@ -66,20 +67,22 @@ class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
certs['sni_certs'])
for amp in listener.load_balancer.amphorae:
self.client.upload_config(amp, listener.id, config)
# todo (german): add a method to REST interface to reload or start
# without having to check
# Is that listener running?
r = self.client.get_listener_status(amp,
listener.id)
if r['status'] == 'ACTIVE':
self.client.reload_listener(amp, listener.id)
else:
self.client.start_listener(amp, listener.id)
if amp.status != constants.DELETED:
self.client.upload_config(amp, listener.id, config)
# todo (german): add a method to REST interface to reload or
# start without having to check
# Is that listener running?
r = self.client.get_listener_status(amp,
listener.id)
if r['status'] == 'ACTIVE':
self.client.reload_listener(amp, listener.id)
else:
self.client.start_listener(amp, listener.id)
def _apply(self, func, listener=None, *args):
for amp in listener.load_balancer.amphorae:
func(amp, listener.id, *args)
if amp.status != constants.DELETED:
func(amp, listener.id, *args)
def stop(self, listener, vip):
self._apply(self.client.stop_listener, listener)
@ -101,19 +104,21 @@ class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
def post_vip_plug(self, load_balancer, amphorae_network_config):
for amp in load_balancer.amphorae:
subnet = amphorae_network_config.get(amp.id).vip_subnet
# NOTE(blogan): using the vrrp port here because that is what the
# allowed address pairs network driver sets this particular port
# to. This does expose a bit of tight coupling between the network
# driver and amphora driver. We will need to revisit this to
# try and remove this tight coupling.
port = amphorae_network_config.get(amp.id).vrrp_port
net_info = {'subnet_cidr': subnet.cidr,
'gateway': subnet.gateway_ip,
'mac_address': port.mac_address}
self.client.plug_vip(amp,
load_balancer.vip.ip_address,
net_info)
if amp.status != constants.DELETED:
subnet = amphorae_network_config.get(amp.id).vip_subnet
# NOTE(blogan): using the vrrp port here because that
# is what the allowed address pairs network driver sets
# this particular port to. This does expose a bit of
# tight coupling between the network driver and amphora
# driver. We will need to revisit this to try and remove
# this tight coupling.
port = amphorae_network_config.get(amp.id).vrrp_port
net_info = {'subnet_cidr': subnet.cidr,
'gateway': subnet.gateway_ip,
'mac_address': port.mac_address}
self.client.plug_vip(amp,
load_balancer.vip.ip_address,
net_info)
def post_network_plug(self, amphora, port):
port_info = {'mac_address': port.mac_address}

View File

@ -25,6 +25,7 @@ from octavia.amphorae.driver_exceptions import exceptions as exc
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg
from octavia.common.config import cfg
from octavia.common import constants
from octavia.common.tls_utils import cert_parser
from octavia.i18n import _LW
@ -182,21 +183,21 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
load_balancer.id)
for amp in load_balancer.amphorae:
# Connect to amphora
self._connect(hostname=amp.lb_network_ip)
if amp.status != constants.DELETED:
# Connect to amphora
self._connect(hostname=amp.lb_network_ip)
mac = amphorae_network_config.get(amp.id).vrrp_port.mac_address
stdout, _ = self._execute_command(
CMD_GREP_LINK_BY_MAC.format(mac_address=mac))
iface = stdout[:-2]
if not iface:
self.client.close()
continue
self._configure_amp_interface(
iface, secondary_ip=load_balancer.vip.ip_address)
self._configure_amp_routes(
iface, amphorae_network_config.get(amp.id))
self.client.close()
mac = amphorae_network_config.get(amp.id).vrrp_port.mac_address
stdout, _ = self._execute_command(
CMD_GREP_LINK_BY_MAC.format(mac_address=mac))
iface = stdout[:-2]
if not iface:
self.client.close()
continue
self._configure_amp_interface(
iface, secondary_ip=load_balancer.vip.ip_address)
self._configure_amp_routes(
iface, amphorae_network_config.get(amp.id))
def post_network_plug(self, amphora, port):
self._connect(hostname=amphora.lb_network_ip)
@ -278,27 +279,28 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
temps.append(temp)
for amp in amphorae:
# Connect to amphora
self._connect(hostname=amp.lb_network_ip)
if amp.status != constants.DELETED:
# Connect to amphora
self._connect(hostname=amp.lb_network_ip)
# Setup for file upload
if make_dir:
mkdir_cmd = 'mkdir -p {0}'.format(make_dir)
self._execute_command(mkdir_cmd, run_as_root=True)
chown_cmd = 'chown -R {0} {1}'.format(
self.amp_config.username, make_dir)
self._execute_command(chown_cmd, run_as_root=True)
# Setup for file upload
if make_dir:
mkdir_cmd = 'mkdir -p {0}'.format(make_dir)
self._execute_command(mkdir_cmd, run_as_root=True)
chown_cmd = 'chown -R {0} {1}'.format(
self.amp_config.username, make_dir)
self._execute_command(chown_cmd, run_as_root=True)
# Upload files to location
if temps:
sftp = self.client.open_sftp()
for temp in temps:
sftp.put(temp.name, upload_dir)
# Upload files to location
if temps:
sftp = self.client.open_sftp()
for temp in temps:
sftp.put(temp.name, upload_dir)
# Execute remaining commands
for command in commands:
self._execute_command(command, run_as_root=True)
self.client.close()
# Execute remaining commands
for command in commands:
self._execute_command(command, run_as_root=True)
self.client.close()
# Close the temp file
for temp in temps:

View File

@ -80,11 +80,14 @@ SUPPORTED_AMPHORA_TYPES = (AMPHORA_VM,)
# Task/Flow constants
AMPHORA = 'amphora'
FAILOVER_AMPHORA = 'failover_amphora'
AMPHORAE = 'amphorae'
AMPHORA_ID = 'amphora_id'
FAILOVER_AMPHORA_ID = 'failover_amphora_id'
DELTA = 'delta'
DELTAS = 'deltas'
LISTENER = 'listener'
LISTENERS = 'listeners'
LOADBALANCER = 'loadbalancer'
LOADBALANCER_ID = 'loadbalancer_id'
COMPUTE_ID = 'compute_id'
@ -99,6 +102,8 @@ SERVER_PEM = 'server_pem'
VIP_NETWORK = 'vip_network'
AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config'
ADDED_PORTS = 'added_ports'
PORTS = 'ports'
MEMBER_PORTS = 'member_ports'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow'
@ -119,6 +124,7 @@ UPDATE_LISTENER_FLOW = 'octavia-update-listener-flow'
UPDATE_LOADBALANCER_FLOW = 'octavia-update-loadbalancer-flow'
UPDATE_MEMBER_FLOW = 'octavia-update-member-flow'
UPDATE_POOL_FLOW = 'octavia-update-pool-flow'
FAILOVER_AMPHORA_FLOW = 'octavia-failover-amphora-flow'
# Task Names
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'

View File

@ -47,10 +47,10 @@ class ComputeBase(object):
pass
@abc.abstractmethod
def delete(self, amphora_id):
def delete(self, compute_id):
"""Delete the specified amphora
:param amphora_id: The id of the amphora to delete
:param compute_id: The id of the amphora to delete
"""
pass

View File

@ -40,7 +40,7 @@ class VirtualMachineManager(compute_base.ComputeBase):
def build(self, name="amphora_name", amphora_flavor=None, image_id=None,
key_name=None, sec_groups=None, network_ids=None,
config_drive_files=None, user_data=None):
port_ids=None, config_drive_files=None, user_data=None):
'''Create a new virtual machine.
:param name: optional name for amphora
@ -49,6 +49,7 @@ class VirtualMachineManager(compute_base.ComputeBase):
:param key_name: keypair to add to the virtual machine
:param sec_groups: Security group IDs for virtual machine
:param network_ids: Network IDs to include on virtual machine
:param port_ids: Port IDs to include on virtual machine
:param config_drive_files: An optional dict of files to overwrite on
the server upon boot. Keys are file names (i.e. /etc/passwd)
and values are the file contents (either as a string or as
@ -63,9 +64,13 @@ class VirtualMachineManager(compute_base.ComputeBase):
'''
try:
network_ids = network_ids or []
port_ids = port_ids or []
nics = []
for net_id in network_ids:
nics.append({"net-id": net_id})
if network_ids:
nics.extend([{"net-id": net_id} for net_id in network_ids])
if port_ids:
nics.extend([{"port-id": port_id} for port_id in port_ids])
amphora = self.manager.create(
name=name, image=image_id, flavor=amphora_flavor,
@ -81,13 +86,13 @@ class VirtualMachineManager(compute_base.ComputeBase):
LOG.exception(_LE("Error building nova virtual machine."))
raise exceptions.ComputeBuildException()
def delete(self, amphora_id):
def delete(self, compute_id):
'''Delete a virtual machine.
:param amphora_id: virtual machine UUID
:param compute_id: virtual machine UUID
'''
try:
self.manager.delete(server=amphora_id)
self.manager.delete(server=compute_id)
except Exception:
LOG.exception(_LE("Error deleting nova virtual machine."))
raise exceptions.ComputeDeleteException()

View File

@ -458,3 +458,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
with tf_logging.DynamicLoggingListener(update_pool_tf,
log=LOG):
update_pool_tf.run()
def failover_amphora(self, amphora_id):
"""Perform failover operations for an amphora.
:param amphora_id: ID for amphora to failover
:returns: None
:raises AmphoraNotFound: The referenced amphora was not found
"""
amp = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(),
store={constants.AMPHORA: amp,
constants.LOADBALANCER_ID: amp.load_balancer_id})
with tf_logging.DynamicLoggingListener(failover_amphora_tf,
log=LOG):
failover_amphora_tf.run()

View File

@ -23,6 +23,7 @@ from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import cert_task
from octavia.controller.worker.tasks import compute_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import network_tasks
CONF = cfg.CONF
@ -153,3 +154,94 @@ class AmphoraFlows(object):
MarkAmphoraDeletedInDB(
requires='amphora'))
return delete_amphora_flow
def get_failover_flow(self):
"""Creates a flow to failover a stale amphora
:returns: The flow for amphora failover
"""
failover_amphora_flow = linear_flow.Flow(
constants.FAILOVER_AMPHORA_FLOW)
failover_amphora_flow.add(
network_tasks.RetrievePortIDsOnAmphoraExceptLBNetwork(
requires=constants.AMPHORA, provides=constants.PORTS))
failover_amphora_flow.add(network_tasks.FailoverPreparationForAmphora(
requires=constants.AMPHORA))
failover_amphora_flow.add(compute_tasks.ComputeDelete(
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB(
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.CreateAmphoraInDB(
provides=constants.AMPHORA_ID))
failover_amphora_flow.add(
database_tasks.GetUpdatedFailoverAmpNetworkDetailsAsList(
requires=(constants.AMPHORA_ID, constants.AMPHORA),
provides=constants.AMPS_DATA))
if self.REST_AMPHORA_DRIVER:
failover_amphora_flow.add(cert_task.GenerateServerPEMTask(
provides=constants.SERVER_PEM))
failover_amphora_flow.add(compute_tasks.CertComputeCreate(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM),
provides=constants.COMPUTE_ID))
else:
failover_amphora_flow.add(compute_tasks.ComputeCreate(
requires=constants.AMPHORA_ID, provides=constants.COMPUTE_ID))
failover_amphora_flow.add(database_tasks.UpdateAmphoraComputeId(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
failover_amphora_flow.add(
database_tasks.AssociateFailoverAmphoraWithLBID(
requires=(constants.AMPHORA_ID, constants.LOADBALANCER_ID)))
failover_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
wait_flow = linear_flow.Flow('wait_for_amphora',
retry=retry.Times(CONF.
controller_worker.
amp_active_retries))
wait_flow.add(compute_tasks.ComputeWait(
requires=constants.COMPUTE_ID,
provides=constants.COMPUTE_OBJ))
wait_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
failover_amphora_flow.add(wait_flow)
failover_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.FAILOVER_AMPHORA))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraFinalize(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.UpdateAmphoraVIPData(
requires=constants.AMPS_DATA))
failover_amphora_flow.add(database_tasks.ReloadLoadBalancer(
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
failover_amphora_flow.add(network_tasks.GetAmphoraeNetworkConfigs(
requires=constants.LOADBALANCER,
provides=constants.AMPHORAE_NETWORK_CONFIG))
failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.LISTENERS))
failover_amphora_flow.add(database_tasks.GetVipFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.VIP))
failover_amphora_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=(constants.LISTENERS, constants.VIP)))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
requires=(constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))
failover_amphora_flow.add(
network_tasks.GetMemberPorts(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA},
requires=(constants.LOADBALANCER, constants.AMPHORA),
provides=constants.MEMBER_PORTS
))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostNetworkPlug(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA,
constants.PORTS: constants.MEMBER_PORTS},
requires=(constants.AMPHORA, constants.PORTS)))
failover_amphora_flow.add(amphora_driver_tasks.ListenersStart(
requires=(constants.LISTENERS, constants.VIP)))
failover_amphora_flow.add(database_tasks.MarkAmphoraAllocatedInDB(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA},
requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
return failover_amphora_flow

View File

@ -62,6 +62,24 @@ class ListenerUpdate(BaseAmphoraTask):
return None
class ListenersUpdate(BaseAmphoraTask):
"""Task to update amphora with all listeners' configurations."""
def execute(self, listeners, vip):
"""Execute updates per listener for an amphora."""
for listener in listeners:
self.amphora_driver.update(listener, vip)
def revert(self, listeners, *args, **kwargs):
"""Handle failed listeners updates."""
LOG.warn(_LW("Reverting listeners updates."))
for listener in listeners:
self.listener_repo.update(db_apis.get_session(), id=listener.id,
provisioning_status=constants.ERROR)
return None
class ListenerStop(BaseAmphoraTask):
"""Task to stop the listener on the vip."""
@ -96,6 +114,25 @@ class ListenerStart(BaseAmphoraTask):
return None
class ListenersStart(BaseAmphoraTask):
"""Task to start all listeners on the vip."""
def execute(self, listeners, vip):
"""Execute listener start routines for listeners on an amphora."""
for listener in listeners:
self.amphora_driver.start(listener, vip)
LOG.debug("Started the listeners on the vip")
def revert(self, listeners, *args, **kwargs):
"""Handle failed listeners starts."""
LOG.warn(_LW("Reverting listeners starts."))
for listener in listeners:
self.listener_repo.update(db_apis.get_session(), id=listener.id,
provisioning_status=constants.ERROR)
return None
class ListenerDelete(BaseAmphoraTask):
"""Task to delete the listener on the vip."""
@ -148,10 +185,13 @@ class AmphoraFinalize(BaseAmphoraTask):
class AmphoraPostNetworkPlug(BaseAmphoraTask):
"""Task to notify the amphora post network plug."""
def execute(self, amphora):
def execute(self, amphora, ports):
"""Execute post_network_plug routine."""
self.amphora_driver.post_network_plug(amphora)
LOG.debug("Posted network plug for the compute instance")
for port in ports:
self.amphora_driver.post_network_plug(amphora, port)
LOG.debug("post_network_plug called on compute instance "
"{compute_id} for port {port_id}".format(
compute_id=amphora.compute_id, port_id=port.id))
def revert(self, result, amphora, *args, **kwargs):
"""Handle a failed post network plug."""
@ -167,16 +207,12 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask):
def execute(self, loadbalancer, added_ports):
"""Execute post_network_plug routine."""
amp_post_plug = AmphoraPostNetworkPlug()
for amphora in loadbalancer.amphorae:
if amphora.id in added_ports:
for port in added_ports[amphora.id]:
self.amphora_driver.post_network_plug(amphora, port)
LOG.debug(
"post_network_plug called on compute instance "
"{compute_id} for port {port_id}".format(
compute_id=amphora.compute_id, port_id=port.id))
amp_post_plug.execute(amphora, added_ports[amphora.id])
def revert(self, result, loadbalancer, deltas, *args, **kwargs):
def revert(self, result, loadbalancer, added_ports, *args, **kwargs):
"""Handle a failed post network plug."""
if isinstance(result, failure.Failure):
return

View File

@ -46,7 +46,7 @@ class BaseComputeTask(task.Task):
class ComputeCreate(BaseComputeTask):
"""Create the compute instance for a new amphora."""
def execute(self, amphora_id, config_drive_files=None):
def execute(self, amphora_id, ports=None, config_drive_files=None):
"""Create an amphora
:returns: an amphora
@ -63,6 +63,7 @@ class ComputeCreate(BaseComputeTask):
key_name=CONF.controller_worker.amp_ssh_key_name,
sec_groups=CONF.controller_worker.amp_secgroup_list,
network_ids=[CONF.controller_worker.amp_network],
port_ids=[port.id for port in ports] if ports else [],
config_drive_files=config_drive_files)
LOG.debug("Server created with id: %s for amphora id: %s" %
@ -94,7 +95,7 @@ class ComputeCreate(BaseComputeTask):
class CertComputeCreate(ComputeCreate):
def execute(self, amphora_id, server_pem):
def execute(self, amphora_id, server_pem, ports=None):
"""Create an amphora
:returns: an amphora
@ -106,8 +107,8 @@ class CertComputeCreate(ComputeCreate):
# '/etc/octavia/octavia.conf'
'/etc/octavia/certs/server.pem': server_pem,
'/etc/octavia/certs/client_ca.pem': client_ca}
return super(CertComputeCreate, self).execute(amphora_id,
config_drive_files)
return super(CertComputeCreate, self).execute(
amphora_id, ports=ports, config_drive_files=config_drive_files)
class DeleteAmphoraeOnLoadBalancer(BaseComputeTask):
@ -119,7 +120,7 @@ class DeleteAmphoraeOnLoadBalancer(BaseComputeTask):
def execute(self, loadbalancer):
for amp in loadbalancer.amphorae:
try:
self.compute.delete(amphora_id=amp.compute_id)
self.compute.delete(compute_id=amp.compute_id)
except Exception as e:
LOG.error(_LE("Nova delete for amphora id: %(amp)s failed:"
"%(exp)s"), {'amp': amp.id, 'exp': e})
@ -128,14 +129,13 @@ class DeleteAmphoraeOnLoadBalancer(BaseComputeTask):
class ComputeDelete(BaseComputeTask):
def execute(self, amphora):
compute_id = amphora.compute_id
LOG.debug("Nova Delete execute for amphora with id %s" % compute_id)
LOG.debug("Nova Delete execute for amphora with id %s" % amphora.id)
try:
self.compute.delete(amphora_id=compute_id)
self.compute.delete(compute_id=amphora.compute_id)
except Exception as e:
LOG.error(_LE("Nova delete for amphora id: %(amp)s failed:"
"%(exp)s"), {'amp': compute_id, 'exp': e})
"%(exp)s"), {'amp': amphora.id, 'exp': e})
raise e

View File

@ -20,6 +20,7 @@ from taskflow import task
from taskflow.types import failure
from octavia.common import constants
from octavia.common import data_models
from octavia.common import exceptions
from octavia.db import api as db_apis
from octavia.db import repositories as repo
@ -257,6 +258,18 @@ class UpdateAmphoraVIPData(BaseDatabaseTask):
ha_port_id=amp_data.ha_port_id)
class AssociateFailoverAmphoraWithLBID(BaseDatabaseTask):
def execute(self, amphora_id, loadbalancer_id):
self.repos.amphora.associate(db_apis.get_session(),
load_balancer_id=loadbalancer_id,
amphora_id=amphora_id)
def revert(self, amphora_id):
self.repos.amphora.update(db_apis.get_session(), amphora_id,
loadbalancer_id=None)
class MapLoadbalancerToAmphora(BaseDatabaseTask):
"""Maps and assigns a load balancer to an amphora in the database."""
@ -812,3 +825,34 @@ class UpdatePoolInDB(BaseDatabaseTask):
# TODO(johnsom) fix this to set the upper ojects to ERROR
self.pool_repo.update(db_apis.get_session(), pool.id,
enabled=0)
class GetUpdatedFailoverAmpNetworkDetailsAsList(BaseDatabaseTask):
"""Task to retrieve amphora network details."""
def execute(self, amphora_id, amphora):
amp_net_configs = [data_models.Amphora(
id=amphora_id,
vrrp_ip=amphora.vrrp_ip,
ha_ip=amphora.ha_ip,
vrrp_port_id=amphora.vrrp_port_id,
ha_port_id=amphora.ha_port_id)]
return amp_net_configs
class GetListenersFromLoadbalancer(BaseDatabaseTask):
"""Task to pull the listener from a loadbalancer."""
def execute(self, loadbalancer):
listeners = []
for listener in loadbalancer.listeners:
l = self.listener_repo.get(db_apis.get_session(), id=listener.id)
listeners.append(l)
return listeners
class GetVipFromLoadbalancer(BaseDatabaseTask):
"""Task to pull the vip from a loadbalancer."""
def execute(self, loadbalancer):
return loadbalancer.vip

View File

@ -24,7 +24,7 @@ from taskflow.types import failure
from octavia.common import constants
from octavia.i18n import _LW, _LE
from octavia.network import base
from octavia.network import data_models
from octavia.network import data_models as n_data_models
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -45,6 +45,50 @@ class BaseNetworkTask(task.Task):
).driver
class CalculateAmphoraDelta(BaseNetworkTask):
default_provides = constants.DELTA
def execute(self, loadbalancer, amphora):
LOG.debug("Calculating network delta for amphora id: %s" % amphora.id)
# Figure out what networks we want
# seed with lb network(s)
subnet = self.network_driver.get_subnet(loadbalancer.vip.subnet_id)
desired_network_ids = {CONF.controller_worker.amp_network,
subnet.network_id}
if not loadbalancer.listeners:
return None
for listener in loadbalancer.listeners:
if (not listener.default_pool) or (
not listener.default_pool.members):
continue
member_networks = [
self.network_driver.get_subnet(member.subnet_id).network_id
for member in listener.default_pool.members
if member.subnet_id
]
desired_network_ids.update(member_networks)
nics = self.network_driver.get_plugged_networks(amphora.compute_id)
# assume we don't have two nics in the same network
actual_network_nics = dict((nic.network_id, nic) for nic in nics)
del_ids = set(actual_network_nics) - desired_network_ids
delete_nics = list(
actual_network_nics[net_id] for net_id in del_ids)
add_ids = desired_network_ids - set(actual_network_nics)
add_nics = list(n_data_models.Interface(
network_id=net_id) for net_id in add_ids)
delta = n_data_models.Delta(
amphora_id=amphora.id, compute_id=amphora.compute_id,
add_nics=add_nics, delete_nics=delete_nics)
return delta
class CalculateDelta(BaseNetworkTask):
"""Task to calculate the delta between
@ -59,53 +103,17 @@ class CalculateDelta(BaseNetworkTask):
"""Compute which NICs need to be plugged
for the amphora to become operational.
:param amphora - the amphora configuration we
want to achieve
:param nics - the nics on the real amphora
:returns the delta
:param loadbalancer: the loadbalancer to calculate deltas for all
amphorae
:returns: dict of octavia.network.data_models.Delta keyed off amphora
id
"""
calculate_amp = CalculateAmphoraDelta()
deltas = {}
for amphora in loadbalancer.amphorae:
LOG.debug("Calculating network delta for amphora id: %s"
% amphora.id)
# Figure out what networks we want
# seed with lb network(s)
subnet = self.network_driver.get_subnet(loadbalancer.vip.subnet_id)
desired_network_ids = {CONF.controller_worker.amp_network,
subnet.network_id}
if not loadbalancer.listeners:
return {}
for listener in loadbalancer.listeners:
if (not listener.default_pool) or (
not listener.default_pool.members):
continue
member_networks = [
self.network_driver.get_subnet(member.subnet_id).network_id
for member in listener.default_pool.members
if member.subnet_id
]
desired_network_ids.update(member_networks)
nics = self.network_driver.get_plugged_networks(amphora.compute_id)
# assume we don't have two nics in the same network
actual_network_nics = dict((nic.network_id, nic) for nic in nics)
del_ids = set(actual_network_nics) - desired_network_ids
delete_nics = list(
actual_network_nics[net_id] for net_id in del_ids)
add_ids = desired_network_ids - set(actual_network_nics)
add_nics = list(data_models.Interface(
network_id=net_id) for net_id in add_ids)
deltas[amphora.id] = data_models.Delta(
amphora_id=amphora.id, compute_id=amphora.compute_id,
add_nics=add_nics, delete_nics=delete_nics)
delta = calculate_amp.execute(loadbalancer, amphora)
deltas[amphora.id] = delta
return deltas
@ -190,6 +198,25 @@ class UnPlugNetworks(BaseNetworkTask):
pass # Todo(german) follow up if that makes sense
class GetMemberPorts(BaseNetworkTask):
def execute(self, loadbalancer, amphora):
vip_port = self.network_driver.get_port(loadbalancer.vip.port_id)
member_ports = []
interfaces = self.network_driver.get_plugged_networks(
amphora.compute_id)
for interface in interfaces:
port = self.network_driver.get_port(interface.port_id)
if vip_port.network_id == port.network_id:
continue
port.network = self.network_driver.get_network(port.network_id)
for fixed_ip in port.fixed_ips:
fixed_ip.subnet = self.network_driver.get_subnet(
fixed_ip.subnet_id)
member_ports.append(port)
return member_ports
class HandleNetworkDeltas(BaseNetworkTask):
"""Task to plug and unplug networks
@ -275,9 +302,9 @@ class UnplugVIP(BaseNetworkTask):
try:
self.network_driver.unplug_vip(loadbalancer, loadbalancer.vip)
except Exception as e:
LOG.error(_LE("Unable to unplug vip from load balancer %(id)s: "
"exception: %(ex)s"), id=loadbalancer.id,
ex=e.message)
LOG.error(_LE("Unable to unplug vip from load balancer"
"{id}: exception: {ex}").format(id=loadbalancer.id,
ex=e.message))
class AllocateVIP(BaseNetworkTask):
@ -334,21 +361,56 @@ class GetAmphoraeNetworkConfigs(BaseNetworkTask):
vip_port = self.network_driver.get_port(loadbalancer.vip.port_id)
amp_net_configs = {}
for amp in loadbalancer.amphorae:
LOG.debug("Retrieving network details for "
"amphora {0}".format(amp.id))
vrrp_port = self.network_driver.get_port(amp.vrrp_port_id)
vrrp_subnet = self.network_driver.get_subnet(
vrrp_port.get_subnet_id(amp.vrrp_ip))
ha_port = self.network_driver.get_port(amp.ha_port_id)
ha_subnet = self.network_driver.get_subnet(
ha_port.get_subnet_id(amp.ha_ip))
amp_net_configs[amp.id] = data_models.AmphoraNetworkConfig(
amphora=amp,
vip_subnet=vip_subnet,
vip_port=vip_port,
vrrp_subnet=vrrp_subnet,
vrrp_port=vrrp_port,
ha_subnet=ha_subnet,
ha_port=ha_port
)
if amp.status != constants.DELETED:
LOG.debug("Retrieving network details for "
"amphora {0}".format(amp.id))
vrrp_port = self.network_driver.get_port(amp.vrrp_port_id)
vrrp_subnet = self.network_driver.get_subnet(
vrrp_port.get_subnet_id(amp.vrrp_ip))
ha_port = self.network_driver.get_port(amp.ha_port_id)
ha_subnet = self.network_driver.get_subnet(
ha_port.get_subnet_id(amp.ha_ip))
amp_net_configs[amp.id] = n_data_models.AmphoraNetworkConfig(
amphora=amp,
vip_subnet=vip_subnet,
vip_port=vip_port,
vrrp_subnet=vrrp_subnet,
vrrp_port=vrrp_port,
ha_subnet=ha_subnet,
ha_port=ha_port
)
return amp_net_configs
class FailoverPreparationForAmphora(BaseNetworkTask):
"""Task to prepare an amphora for failover."""
def execute(self, amphora):
LOG.debug("Prepare amphora %s for failover." % amphora.id)
self.network_driver.failover_preparation(amphora)
class RetrievePortIDsOnAmphoraExceptLBNetwork(BaseNetworkTask):
"""Task retrieving all the port ids on an amphora, except lb network."""
def execute(self, amphora):
LOG.debug("Retrieve all but the lb network port id on amphora %s." %
amphora.id)
interfaces = self.network_driver.get_plugged_networks(
compute_id=amphora.compute_id)
ports = []
for interface_ in interfaces:
if interface_.port_id not in ports:
port = self.network_driver.get_port(port_id=interface_.port_id)
ips = port.fixed_ips
lb_network = False
for ip in ips:
if ip.ip_address == amphora.lb_network_ip:
lb_network = True
if not lb_network:
ports.append(port)
return ports

View File

@ -214,3 +214,13 @@ class AbstractNetworkDriver(object):
:raises: NetworkException, PortNotFound
"""
pass
@abc.abstractmethod
def failover_preparation(self, amphora):
"""Prepare an amphora for failover.
:param amphora: amphora object to failover
:return: None
:raises: PortNotFound
"""
pass

View File

@ -272,6 +272,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
msg = ('Amphora with compute id {compute_id} does not have any '
'plugged networks').format(compute_id=compute_id)
raise base.AmphoraNotFound(msg)
unpluggers = self._get_interfaces_to_unplug(interfaces, network_id,
ip_address=ip_address)
try:
@ -299,3 +300,24 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
def update_vip(self, load_balancer):
sec_grp = self._get_lb_security_group(load_balancer.id)
self._update_security_group_rules(load_balancer, sec_grp.get('id'))
def failover_preparation(self, amphora):
interfaces = self.get_plugged_networks(compute_id=amphora.compute_id)
ports = []
for interface_ in interfaces:
port = self.get_port(port_id=interface_.port_id)
ips = port.fixed_ips
lb_network = False
for ip in ips:
if ip.ip_address == amphora.lb_network_ip:
lb_network = True
if not lb_network:
ports.append(port)
for port in ports:
try:
self.neutron_client.update_port(port.id,
{'port': {'device_id': ''}})
except neutron_client_exceptions.NotFound:
raise base.PortNotFound()

View File

@ -92,6 +92,10 @@ class NoopManager(object):
self.__class__.__name__, port_id)
self.networkconfigconfig[port_id] = (port_id, 'get_port')
def failover_preparation(self, amphora):
LOG.debug("failover %s no-op, failover_preparation, amphora id %s",
self.__class__.__name__, amphora.id)
class NoopNetworkDriver(driver_base.AbstractNetworkDriver):
def __init__(self, region=None):
@ -131,3 +135,6 @@ class NoopNetworkDriver(driver_base.AbstractNetworkDriver):
def get_port(self, port_id):
self.driver.get_port(port_id)
def failover_preparation(self, amphora):
self.driver.failover_preparation(amphora)

View File

@ -67,6 +67,7 @@ class TestNovaClient(base.TestCase):
key_name=1,
sec_groups=1,
network_ids=[1],
port_ids=[2],
user_data='Blah',
config_drive_files='Files Blah')
@ -74,7 +75,7 @@ class TestNovaClient(base.TestCase):
self.manager.manager.create.assert_called_with(
name="amphora_name",
nics=[{'net-id': 1}],
nics=[{'net-id': 1}, {'port-id': 2}],
image=1,
flavor=1,
key_name=1,

View File

@ -112,3 +112,24 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(len(amp_flow.provides), 0)
self.assertEqual(len(amp_flow.requires), 1)
def test_get_failover_flow(self):
amp_flow = self.AmpFlow.get_failover_flow()
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.FAILOVER_AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.AMPS_DATA, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(len(amp_flow.requires), 2)
self.assertEqual(len(amp_flow.provides), 12)

View File

@ -39,6 +39,8 @@ _vip_mock = mock.MagicMock()
_LB_mock = mock.MagicMock()
_amphorae_mock = [_amphora_mock]
_network_mock = mock.MagicMock()
_port_mock = mock.MagicMock()
_ports_mock = [_port_mock]
@mock.patch('octavia.db.repositories.AmphoraRepository.update')
@ -204,10 +206,10 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_post_network_plug_obj = (amphora_driver_tasks.
AmphoraPostNetworkPlug())
amphora_post_network_plug_obj.execute(_amphora_mock)
amphora_post_network_plug_obj.execute(_amphora_mock, _ports_mock)
(mock_driver.post_network_plug.
assert_called_once_with)(_amphora_mock)
assert_called_once_with)(_amphora_mock, _port_mock)
# Test revert
amp = amphora_post_network_plug_obj.revert(None, _amphora_mock)

View File

@ -43,6 +43,7 @@ AMP_WAIT = 12
AMPHORA_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
LB_NET_IP = '192.0.2.1'
PORT_ID = uuidutils.generate_uuid()
AUTH_VERSION = '2'
@ -55,9 +56,12 @@ class TestException(Exception):
return repr(self.value)
_amphora_mock = mock.MagicMock()
_amphora_mock.id = AMPHORA_ID
_amphora_mock.compute_id = COMPUTE_ID
_load_balancer_mock = mock.MagicMock()
_load_balancer_mock.amphorae = [_amphora_mock]
_port = mock.MagicMock()
_port.id = PORT_ID
class TestComputeTasks(base.TestCase):
@ -86,7 +90,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.build.side_effect = [COMPUTE_ID, TestException('test')]
# Test execute()
compute_id = createcompute.execute(_amphora_mock.id)
compute_id = createcompute.execute(_amphora_mock.id, ports=[_port])
# Validate that the build method was called properly
mock_driver.build.assert_called_once_with(
@ -96,6 +100,7 @@ class TestComputeTasks(base.TestCase):
key_name=AMP_SSH_KEY_NAME,
sec_groups=AMP_SEC_GROUPS,
network_ids=[AMP_NET],
port_ids=[PORT_ID],
config_drive_files=None)
# Make sure it returns the expected compute_id
@ -105,7 +110,7 @@ class TestComputeTasks(base.TestCase):
createcompute = compute_tasks.ComputeCreate()
self.assertRaises(TestException,
createcompute.execute,
_amphora_mock, 'test_cert')
_amphora_mock, config_drive_files='test_cert')
# Test revert()
@ -131,7 +136,8 @@ class TestComputeTasks(base.TestCase):
m = mock.mock_open(read_data='test')
with mock.patch('%s.open' % BUILTINS, m, create=True):
# Test execute()
compute_id = createcompute.execute(_amphora_mock.id, 'test_cert')
compute_id = createcompute.execute(_amphora_mock.id,
'test_cert')
# Validate that the build method was called properly
mock_driver.build.assert_called_once_with(
@ -141,6 +147,7 @@ class TestComputeTasks(base.TestCase):
key_name=AMP_SSH_KEY_NAME,
sec_groups=AMP_SEC_GROUPS,
network_ids=[AMP_NET],
port_ids=[],
config_drive_files={
'/etc/octavia/certs/server.pem': 'test_cert',
'/etc/octavia/certs/client_ca.pem': m.return_value})
@ -153,7 +160,7 @@ class TestComputeTasks(base.TestCase):
createcompute = compute_tasks.ComputeCreate()
self.assertRaises(TestException,
createcompute.execute,
_amphora_mock, 'test_cert')
_amphora_mock, config_drive_files='test_cert')
# Test revert()
@ -201,7 +208,7 @@ class TestComputeTasks(base.TestCase):
delete_amps = compute_tasks.DeleteAmphoraeOnLoadBalancer()
delete_amps.execute(_load_balancer_mock)
mock_driver.delete.assert_called_once_with(amphora_id=COMPUTE_ID)
mock_driver.delete.assert_called_once_with(compute_id=COMPUTE_ID)
@mock.patch('stevedore.driver.DriverManager.driver')
def test_compute_delete(self, mock_driver):
@ -209,4 +216,4 @@ class TestComputeTasks(base.TestCase):
delete_compute = compute_tasks.ComputeDelete()
delete_compute.execute(_amphora_mock)
mock_driver.delete.assert_called_once_with(amphora_id=COMPUTE_ID)
mock_driver.delete.assert_called_once_with(compute_id=COMPUTE_ID)

View File

@ -38,6 +38,11 @@ IP_ADDRESS = "172.24.41.1"
VIP = o_data_models.Vip(port_id=PORT_ID, subnet_id=SUBNET_ID,
ip_address=IP_ADDRESS)
LB = o_data_models.LoadBalancer(vip=VIP)
FIRST_IP = {"ip_address": IP_ADDRESS, "subnet_id": SUBNET_ID}
FIXED_IPS = [FIRST_IP]
INTERFACE = data_models.Interface(id=uuidutils.generate_uuid(),
compute_id=COMPUTE_ID, fixed_ips=FIXED_IPS,
port_id=PORT_ID)
class TestException(Exception):
@ -85,7 +90,8 @@ class TestNetworkTasks(base.TestCase):
self.amphora_mock.load_balancer = self.load_balancer_mock
self.load_balancer_mock.amphorae = [self.amphora_mock]
self.load_balancer_mock.listeners = None
self.assertEqual(EMPTY, net.execute(self.load_balancer_mock))
self.assertEqual({self.amphora_mock.id: None},
net.execute(self.load_balancer_mock))
listener_mock = mock.MagicMock()
self.load_balancer_mock.listeners = [listener_mock]
@ -354,3 +360,10 @@ class TestNetworkTasks(base.TestCase):
lb = o_data_models.LoadBalancer(vip=vip)
net.execute(lb)
mock_driver.deallocate_vip.assert_called_once_with(lb.vip)
def test_failover_preparation_for_amphora(self, mock_driver):
failover = network_tasks.FailoverPreparationForAmphora()
amphora = o_data_models.Amphora(id=AMPHORA_ID,
lb_network_ip=IP_ADDRESS)
failover.execute(amphora)
mock_driver.failover_preparation.assert_called_once_with(amphora)

View File

@ -32,6 +32,7 @@ LB_ID = uuidutils.generate_uuid()
POOL_ID = uuidutils.generate_uuid()
HM_ID = uuidutils.generate_uuid()
MEMBER_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
HEALTH_UPDATE_DICT = {'delay': 1, 'timeout': 2}
LISTENER_UPDATE_DICT = {'name': 'test', 'description': 'test2'}
MEMBER_UPDATE_DICT = {'weight': 1, 'ip_address': '10.0.0.0'}
@ -46,6 +47,7 @@ _load_balancer_mock = mock.MagicMock()
_member_mock = mock.MagicMock()
_pool_mock = mock.MagicMock()
_create_map_flow_mock = mock.MagicMock()
_amphora_mock.load_balancer_id = LB_ID
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
@ -633,3 +635,32 @@ class TestControllerWorker(base.TestCase):
'update_dict': POOL_UPDATE_DICT}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'amphora_flows.AmphoraFlows.get_failover_flow',
return_value=_flow_mock)
def test_failover_amphora(self,
mock_get_update_listener_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.AMPHORA: _amphora_mock,
constants.LOADBALANCER_ID:
_amphora_mock.load_balancer_id}))
_flow_mock.run.assert_called_once_with()

View File

@ -34,6 +34,23 @@ class TestAllowedAddressPairsDriver(base.TestCase):
k_session = None
driver = None
SUBNET_ID_1 = "5"
SUBNET_ID_2 = "8"
FIXED_IP_ID_1 = "6"
FIXED_IP_ID_2 = "8"
NETWORK_ID_1 = "7"
NETWORK_ID_2 = "10"
IP_ADDRESS_1 = "10.0.0.2"
IP_ADDRESS_2 = "12.0.0.2"
AMPHORA_ID = "1"
LB_ID = "2"
COMPUTE_ID = "3"
ACTIVE = "ACTIVE"
LB_NET_IP = "10.0.0.2"
LB_NET_PORT_ID = "6"
HA_PORT_ID = "8"
HA_IP = "12.0.0.2"
def setUp(self):
super(TestAllowedAddressPairsDriver, self).setUp()
with mock.patch('octavia.common.clients.neutron_client.Client',
@ -468,3 +485,34 @@ class TestAllowedAddressPairsDriver(base.TestCase):
self.driver.update_vip(lb)
delete_rule.assert_called_once_with('ssh-rule')
self.assertFalse(create_rule.called)
def test_failover_preparation(self):
ports = {"ports": [
{"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,
"ip_address": self.IP_ADDRESS_1}],
"id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1},
{"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,
"ip_address": self.IP_ADDRESS_2}],
"id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]}
self.driver.neutron_client.list_ports.return_value = ports
self.driver.neutron_client.show_port = mock.Mock(
side_effect=self._failover_show_port_side_effect)
port_update = self.driver.neutron_client.update_port
amphora = data_models.Amphora(
id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,
compute_id=self.COMPUTE_ID, status=self.ACTIVE,
lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,
ha_ip=self.HA_IP)
self.driver.failover_preparation(amphora)
port_update.assert_called_once_with(ports["ports"][1].get("id"),
{'port': {'device_id': ''}})
def _failover_show_port_side_effect(self, port_id):
if port_id == self.LB_NET_PORT_ID:
return {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,
"ip_address": self.IP_ADDRESS_1}],
"id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1}
if port_id == self.HA_PORT_ID:
return {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,
"ip_address": self.IP_ADDRESS_2}],
"id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}

View File

@ -206,6 +206,15 @@ The Controller Worker library will provide the following methods:
"""
raise NotImplementedError
def failover_amphora(self, amphora_id):
"""Failover an amphora
:param amp_id: ID of the amphora to fail over
:returns: None
:raises AmphoraNotFound: The referenced Amphora was not found
"""
raise NotImplementedError
Alternatives
------------
This code could be included in the Queue Consumer component of the controller.

View File

@ -236,6 +236,13 @@ class AbstractNetworkDriver
* returns = Port data model
* raises NetworkException, PortNotFound
* failover_preparation(amphora):
* Prepare an amphora for failover
* amphora = amphora data model
* returns = None
* raises PortNotFound
Alternatives
------------