From 2cfcf3eff1f1cd48b826284f0c65994c36996476 Mon Sep 17 00:00:00 2001 From: Trevor Vardeman Date: Wed, 15 Jul 2015 16:52:12 -0700 Subject: [PATCH] Adding amphora failover flows Added a flow to complete automated failover for an amphora. Added new tasks to retrieve ports, change port device ids for re-allocating to new amphora, and added functionality to include existing ports on amphora creation. Co-Authored-By: Brandon Logan Co-Authored-By: Michael Johnson Change-Id: Ic0d3f2b9a48ebf66df78e1ef98aef92ad7ee5057 --- .../drivers/haproxy/rest_api_driver.py | 53 ++--- .../amphorae/drivers/haproxy/ssh_driver.py | 66 +++--- octavia/common/constants.py | 6 + octavia/compute/compute_base.py | 4 +- octavia/compute/drivers/nova_driver.py | 17 +- .../controller/worker/controller_worker.py | 19 ++ .../controller/worker/flows/amphora_flows.py | 92 +++++++++ .../worker/tasks/amphora_driver_tasks.py | 56 +++++- .../controller/worker/tasks/compute_tasks.py | 18 +- .../controller/worker/tasks/database_tasks.py | 44 ++++ .../controller/worker/tasks/network_tasks.py | 190 ++++++++++++------ octavia/network/base.py | 10 + .../drivers/neutron/allowed_address_pairs.py | 22 ++ octavia/network/drivers/noop_driver/driver.py | 7 + .../unit/compute/drivers/test_nova_driver.py | 3 +- .../worker/flows/test_amphora_flows.py | 21 ++ .../worker/tasks/test_amphora_driver_tasks.py | 6 +- .../worker/tasks/test_compute_tasks.py | 19 +- .../worker/tasks/test_network_tasks.py | 15 +- .../worker/test_controller_worker.py | 31 +++ .../neutron/test_allowed_address_pairs.py | 48 +++++ specs/version0.5/controller-worker.rst | 9 + specs/version0.5/network-driver-interface.rst | 7 + 23 files changed, 606 insertions(+), 157 deletions(-) diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index 11bf4de423..3c50990570 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -26,6 +26,7 @@ from octavia.amphorae.drivers import driver_base as driver_base from octavia.amphorae.drivers.haproxy import exceptions as exc from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg from octavia.common.config import cfg +from octavia.common import constants from octavia.common import data_models as data_models from octavia.common.tls_utils import cert_parser from octavia.i18n import _LW @@ -66,20 +67,22 @@ class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver): certs['sni_certs']) for amp in listener.load_balancer.amphorae: - self.client.upload_config(amp, listener.id, config) - # todo (german): add a method to REST interface to reload or start - # without having to check - # Is that listener running? - r = self.client.get_listener_status(amp, - listener.id) - if r['status'] == 'ACTIVE': - self.client.reload_listener(amp, listener.id) - else: - self.client.start_listener(amp, listener.id) + if amp.status != constants.DELETED: + self.client.upload_config(amp, listener.id, config) + # todo (german): add a method to REST interface to reload or + # start without having to check + # Is that listener running? + r = self.client.get_listener_status(amp, + listener.id) + if r['status'] == 'ACTIVE': + self.client.reload_listener(amp, listener.id) + else: + self.client.start_listener(amp, listener.id) def _apply(self, func, listener=None, *args): for amp in listener.load_balancer.amphorae: - func(amp, listener.id, *args) + if amp.status != constants.DELETED: + func(amp, listener.id, *args) def stop(self, listener, vip): self._apply(self.client.stop_listener, listener) @@ -101,19 +104,21 @@ class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver): def post_vip_plug(self, load_balancer, amphorae_network_config): for amp in load_balancer.amphorae: - subnet = amphorae_network_config.get(amp.id).vip_subnet - # NOTE(blogan): using the vrrp port here because that is what the - # allowed address pairs network driver sets this particular port - # to. This does expose a bit of tight coupling between the network - # driver and amphora driver. We will need to revisit this to - # try and remove this tight coupling. - port = amphorae_network_config.get(amp.id).vrrp_port - net_info = {'subnet_cidr': subnet.cidr, - 'gateway': subnet.gateway_ip, - 'mac_address': port.mac_address} - self.client.plug_vip(amp, - load_balancer.vip.ip_address, - net_info) + if amp.status != constants.DELETED: + subnet = amphorae_network_config.get(amp.id).vip_subnet + # NOTE(blogan): using the vrrp port here because that + # is what the allowed address pairs network driver sets + # this particular port to. This does expose a bit of + # tight coupling between the network driver and amphora + # driver. We will need to revisit this to try and remove + # this tight coupling. + port = amphorae_network_config.get(amp.id).vrrp_port + net_info = {'subnet_cidr': subnet.cidr, + 'gateway': subnet.gateway_ip, + 'mac_address': port.mac_address} + self.client.plug_vip(amp, + load_balancer.vip.ip_address, + net_info) def post_network_plug(self, amphora, port): port_info = {'mac_address': port.mac_address} diff --git a/octavia/amphorae/drivers/haproxy/ssh_driver.py b/octavia/amphorae/drivers/haproxy/ssh_driver.py index 80baa2273b..450f482a43 100644 --- a/octavia/amphorae/drivers/haproxy/ssh_driver.py +++ b/octavia/amphorae/drivers/haproxy/ssh_driver.py @@ -25,6 +25,7 @@ from octavia.amphorae.driver_exceptions import exceptions as exc from octavia.amphorae.drivers import driver_base as driver_base from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg from octavia.common.config import cfg +from octavia.common import constants from octavia.common.tls_utils import cert_parser from octavia.i18n import _LW @@ -182,21 +183,21 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver): load_balancer.id) for amp in load_balancer.amphorae: - # Connect to amphora - self._connect(hostname=amp.lb_network_ip) + if amp.status != constants.DELETED: + # Connect to amphora + self._connect(hostname=amp.lb_network_ip) - mac = amphorae_network_config.get(amp.id).vrrp_port.mac_address - stdout, _ = self._execute_command( - CMD_GREP_LINK_BY_MAC.format(mac_address=mac)) - iface = stdout[:-2] - if not iface: - self.client.close() - continue - self._configure_amp_interface( - iface, secondary_ip=load_balancer.vip.ip_address) - self._configure_amp_routes( - iface, amphorae_network_config.get(amp.id)) - self.client.close() + mac = amphorae_network_config.get(amp.id).vrrp_port.mac_address + stdout, _ = self._execute_command( + CMD_GREP_LINK_BY_MAC.format(mac_address=mac)) + iface = stdout[:-2] + if not iface: + self.client.close() + continue + self._configure_amp_interface( + iface, secondary_ip=load_balancer.vip.ip_address) + self._configure_amp_routes( + iface, amphorae_network_config.get(amp.id)) def post_network_plug(self, amphora, port): self._connect(hostname=amphora.lb_network_ip) @@ -278,27 +279,28 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver): temps.append(temp) for amp in amphorae: - # Connect to amphora - self._connect(hostname=amp.lb_network_ip) + if amp.status != constants.DELETED: + # Connect to amphora + self._connect(hostname=amp.lb_network_ip) - # Setup for file upload - if make_dir: - mkdir_cmd = 'mkdir -p {0}'.format(make_dir) - self._execute_command(mkdir_cmd, run_as_root=True) - chown_cmd = 'chown -R {0} {1}'.format( - self.amp_config.username, make_dir) - self._execute_command(chown_cmd, run_as_root=True) + # Setup for file upload + if make_dir: + mkdir_cmd = 'mkdir -p {0}'.format(make_dir) + self._execute_command(mkdir_cmd, run_as_root=True) + chown_cmd = 'chown -R {0} {1}'.format( + self.amp_config.username, make_dir) + self._execute_command(chown_cmd, run_as_root=True) - # Upload files to location - if temps: - sftp = self.client.open_sftp() - for temp in temps: - sftp.put(temp.name, upload_dir) + # Upload files to location + if temps: + sftp = self.client.open_sftp() + for temp in temps: + sftp.put(temp.name, upload_dir) - # Execute remaining commands - for command in commands: - self._execute_command(command, run_as_root=True) - self.client.close() + # Execute remaining commands + for command in commands: + self._execute_command(command, run_as_root=True) + self.client.close() # Close the temp file for temp in temps: diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 8803e0ae7d..40f2541ca6 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -80,11 +80,14 @@ SUPPORTED_AMPHORA_TYPES = (AMPHORA_VM,) # Task/Flow constants AMPHORA = 'amphora' +FAILOVER_AMPHORA = 'failover_amphora' AMPHORAE = 'amphorae' AMPHORA_ID = 'amphora_id' +FAILOVER_AMPHORA_ID = 'failover_amphora_id' DELTA = 'delta' DELTAS = 'deltas' LISTENER = 'listener' +LISTENERS = 'listeners' LOADBALANCER = 'loadbalancer' LOADBALANCER_ID = 'loadbalancer_id' COMPUTE_ID = 'compute_id' @@ -99,6 +102,8 @@ SERVER_PEM = 'server_pem' VIP_NETWORK = 'vip_network' AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config' ADDED_PORTS = 'added_ports' +PORTS = 'ports' +MEMBER_PORTS = 'member_ports' CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow' CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow' @@ -119,6 +124,7 @@ UPDATE_LISTENER_FLOW = 'octavia-update-listener-flow' UPDATE_LOADBALANCER_FLOW = 'octavia-update-loadbalancer-flow' UPDATE_MEMBER_FLOW = 'octavia-update-member-flow' UPDATE_POOL_FLOW = 'octavia-update-pool-flow' +FAILOVER_AMPHORA_FLOW = 'octavia-failover-amphora-flow' # Task Names RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc' diff --git a/octavia/compute/compute_base.py b/octavia/compute/compute_base.py index 8c741d42e8..1d7e0c7e1c 100644 --- a/octavia/compute/compute_base.py +++ b/octavia/compute/compute_base.py @@ -47,10 +47,10 @@ class ComputeBase(object): pass @abc.abstractmethod - def delete(self, amphora_id): + def delete(self, compute_id): """Delete the specified amphora - :param amphora_id: The id of the amphora to delete + :param compute_id: The id of the amphora to delete """ pass diff --git a/octavia/compute/drivers/nova_driver.py b/octavia/compute/drivers/nova_driver.py index d091a9c1d9..25c3b2880b 100644 --- a/octavia/compute/drivers/nova_driver.py +++ b/octavia/compute/drivers/nova_driver.py @@ -40,7 +40,7 @@ class VirtualMachineManager(compute_base.ComputeBase): def build(self, name="amphora_name", amphora_flavor=None, image_id=None, key_name=None, sec_groups=None, network_ids=None, - config_drive_files=None, user_data=None): + port_ids=None, config_drive_files=None, user_data=None): '''Create a new virtual machine. :param name: optional name for amphora @@ -49,6 +49,7 @@ class VirtualMachineManager(compute_base.ComputeBase): :param key_name: keypair to add to the virtual machine :param sec_groups: Security group IDs for virtual machine :param network_ids: Network IDs to include on virtual machine + :param port_ids: Port IDs to include on virtual machine :param config_drive_files: An optional dict of files to overwrite on the server upon boot. Keys are file names (i.e. /etc/passwd) and values are the file contents (either as a string or as @@ -63,9 +64,13 @@ class VirtualMachineManager(compute_base.ComputeBase): ''' try: + network_ids = network_ids or [] + port_ids = port_ids or [] nics = [] - for net_id in network_ids: - nics.append({"net-id": net_id}) + if network_ids: + nics.extend([{"net-id": net_id} for net_id in network_ids]) + if port_ids: + nics.extend([{"port-id": port_id} for port_id in port_ids]) amphora = self.manager.create( name=name, image=image_id, flavor=amphora_flavor, @@ -81,13 +86,13 @@ class VirtualMachineManager(compute_base.ComputeBase): LOG.exception(_LE("Error building nova virtual machine.")) raise exceptions.ComputeBuildException() - def delete(self, amphora_id): + def delete(self, compute_id): '''Delete a virtual machine. - :param amphora_id: virtual machine UUID + :param compute_id: virtual machine UUID ''' try: - self.manager.delete(server=amphora_id) + self.manager.delete(server=compute_id) except Exception: LOG.exception(_LE("Error deleting nova virtual machine.")) raise exceptions.ComputeDeleteException() diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index c5090acfd2..31201b5548 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -458,3 +458,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): with tf_logging.DynamicLoggingListener(update_pool_tf, log=LOG): update_pool_tf.run() + + def failover_amphora(self, amphora_id): + """Perform failover operations for an amphora. + + :param amphora_id: ID for amphora to failover + :returns: None + :raises AmphoraNotFound: The referenced amphora was not found + """ + + amp = self._amphora_repo.get(db_apis.get_session(), + id=amphora_id) + + failover_amphora_tf = self._taskflow_load( + self._amphora_flows.get_failover_flow(), + store={constants.AMPHORA: amp, + constants.LOADBALANCER_ID: amp.load_balancer_id}) + with tf_logging.DynamicLoggingListener(failover_amphora_tf, + log=LOG): + failover_amphora_tf.run() diff --git a/octavia/controller/worker/flows/amphora_flows.py b/octavia/controller/worker/flows/amphora_flows.py index f5a366ccac..f270fd9d18 100644 --- a/octavia/controller/worker/flows/amphora_flows.py +++ b/octavia/controller/worker/flows/amphora_flows.py @@ -23,6 +23,7 @@ from octavia.controller.worker.tasks import amphora_driver_tasks from octavia.controller.worker.tasks import cert_task from octavia.controller.worker.tasks import compute_tasks from octavia.controller.worker.tasks import database_tasks +from octavia.controller.worker.tasks import network_tasks CONF = cfg.CONF @@ -153,3 +154,94 @@ class AmphoraFlows(object): MarkAmphoraDeletedInDB( requires='amphora')) return delete_amphora_flow + + def get_failover_flow(self): + """Creates a flow to failover a stale amphora + + :returns: The flow for amphora failover + """ + + failover_amphora_flow = linear_flow.Flow( + constants.FAILOVER_AMPHORA_FLOW) + failover_amphora_flow.add( + network_tasks.RetrievePortIDsOnAmphoraExceptLBNetwork( + requires=constants.AMPHORA, provides=constants.PORTS)) + failover_amphora_flow.add(network_tasks.FailoverPreparationForAmphora( + requires=constants.AMPHORA)) + failover_amphora_flow.add(compute_tasks.ComputeDelete( + requires=constants.AMPHORA)) + failover_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB( + requires=constants.AMPHORA)) + failover_amphora_flow.add(database_tasks.CreateAmphoraInDB( + provides=constants.AMPHORA_ID)) + failover_amphora_flow.add( + database_tasks.GetUpdatedFailoverAmpNetworkDetailsAsList( + requires=(constants.AMPHORA_ID, constants.AMPHORA), + provides=constants.AMPS_DATA)) + if self.REST_AMPHORA_DRIVER: + failover_amphora_flow.add(cert_task.GenerateServerPEMTask( + provides=constants.SERVER_PEM)) + failover_amphora_flow.add(compute_tasks.CertComputeCreate( + requires=(constants.AMPHORA_ID, constants.SERVER_PEM), + provides=constants.COMPUTE_ID)) + else: + failover_amphora_flow.add(compute_tasks.ComputeCreate( + requires=constants.AMPHORA_ID, provides=constants.COMPUTE_ID)) + failover_amphora_flow.add(database_tasks.UpdateAmphoraComputeId( + requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) + failover_amphora_flow.add( + database_tasks.AssociateFailoverAmphoraWithLBID( + requires=(constants.AMPHORA_ID, constants.LOADBALANCER_ID))) + failover_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB( + requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) + wait_flow = linear_flow.Flow('wait_for_amphora', + retry=retry.Times(CONF. + controller_worker. + amp_active_retries)) + wait_flow.add(compute_tasks.ComputeWait( + requires=constants.COMPUTE_ID, + provides=constants.COMPUTE_OBJ)) + wait_flow.add(database_tasks.UpdateAmphoraInfo( + requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), + provides=constants.AMPHORA)) + failover_amphora_flow.add(wait_flow) + failover_amphora_flow.add(database_tasks.ReloadAmphora( + requires=constants.AMPHORA_ID, + provides=constants.FAILOVER_AMPHORA)) + failover_amphora_flow.add(amphora_driver_tasks.AmphoraFinalize( + rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA}, + requires=constants.AMPHORA)) + failover_amphora_flow.add(database_tasks.UpdateAmphoraVIPData( + requires=constants.AMPS_DATA)) + failover_amphora_flow.add(database_tasks.ReloadLoadBalancer( + requires=constants.LOADBALANCER_ID, + provides=constants.LOADBALANCER)) + failover_amphora_flow.add(network_tasks.GetAmphoraeNetworkConfigs( + requires=constants.LOADBALANCER, + provides=constants.AMPHORAE_NETWORK_CONFIG)) + failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer( + requires=constants.LOADBALANCER, provides=constants.LISTENERS)) + failover_amphora_flow.add(database_tasks.GetVipFromLoadbalancer( + requires=constants.LOADBALANCER, provides=constants.VIP)) + failover_amphora_flow.add(amphora_driver_tasks.ListenersUpdate( + requires=(constants.LISTENERS, constants.VIP))) + failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostVIPPlug( + requires=(constants.LOADBALANCER, + constants.AMPHORAE_NETWORK_CONFIG))) + failover_amphora_flow.add( + network_tasks.GetMemberPorts( + rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA}, + requires=(constants.LOADBALANCER, constants.AMPHORA), + provides=constants.MEMBER_PORTS + )) + failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostNetworkPlug( + rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA, + constants.PORTS: constants.MEMBER_PORTS}, + requires=(constants.AMPHORA, constants.PORTS))) + failover_amphora_flow.add(amphora_driver_tasks.ListenersStart( + requires=(constants.LISTENERS, constants.VIP))) + failover_amphora_flow.add(database_tasks.MarkAmphoraAllocatedInDB( + rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA}, + requires=(constants.AMPHORA, constants.LOADBALANCER_ID))) + + return failover_amphora_flow diff --git a/octavia/controller/worker/tasks/amphora_driver_tasks.py b/octavia/controller/worker/tasks/amphora_driver_tasks.py index 10f072cd2e..64415b254e 100644 --- a/octavia/controller/worker/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/tasks/amphora_driver_tasks.py @@ -62,6 +62,24 @@ class ListenerUpdate(BaseAmphoraTask): return None +class ListenersUpdate(BaseAmphoraTask): + """Task to update amphora with all listeners' configurations.""" + + def execute(self, listeners, vip): + """Execute updates per listener for an amphora.""" + for listener in listeners: + self.amphora_driver.update(listener, vip) + + def revert(self, listeners, *args, **kwargs): + """Handle failed listeners updates.""" + + LOG.warn(_LW("Reverting listeners updates.")) + for listener in listeners: + self.listener_repo.update(db_apis.get_session(), id=listener.id, + provisioning_status=constants.ERROR) + return None + + class ListenerStop(BaseAmphoraTask): """Task to stop the listener on the vip.""" @@ -96,6 +114,25 @@ class ListenerStart(BaseAmphoraTask): return None +class ListenersStart(BaseAmphoraTask): + """Task to start all listeners on the vip.""" + + def execute(self, listeners, vip): + """Execute listener start routines for listeners on an amphora.""" + for listener in listeners: + self.amphora_driver.start(listener, vip) + LOG.debug("Started the listeners on the vip") + + def revert(self, listeners, *args, **kwargs): + """Handle failed listeners starts.""" + + LOG.warn(_LW("Reverting listeners starts.")) + for listener in listeners: + self.listener_repo.update(db_apis.get_session(), id=listener.id, + provisioning_status=constants.ERROR) + return None + + class ListenerDelete(BaseAmphoraTask): """Task to delete the listener on the vip.""" @@ -148,10 +185,13 @@ class AmphoraFinalize(BaseAmphoraTask): class AmphoraPostNetworkPlug(BaseAmphoraTask): """Task to notify the amphora post network plug.""" - def execute(self, amphora): + def execute(self, amphora, ports): """Execute post_network_plug routine.""" - self.amphora_driver.post_network_plug(amphora) - LOG.debug("Posted network plug for the compute instance") + for port in ports: + self.amphora_driver.post_network_plug(amphora, port) + LOG.debug("post_network_plug called on compute instance " + "{compute_id} for port {port_id}".format( + compute_id=amphora.compute_id, port_id=port.id)) def revert(self, result, amphora, *args, **kwargs): """Handle a failed post network plug.""" @@ -167,16 +207,12 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask): def execute(self, loadbalancer, added_ports): """Execute post_network_plug routine.""" + amp_post_plug = AmphoraPostNetworkPlug() for amphora in loadbalancer.amphorae: if amphora.id in added_ports: - for port in added_ports[amphora.id]: - self.amphora_driver.post_network_plug(amphora, port) - LOG.debug( - "post_network_plug called on compute instance " - "{compute_id} for port {port_id}".format( - compute_id=amphora.compute_id, port_id=port.id)) + amp_post_plug.execute(amphora, added_ports[amphora.id]) - def revert(self, result, loadbalancer, deltas, *args, **kwargs): + def revert(self, result, loadbalancer, added_ports, *args, **kwargs): """Handle a failed post network plug.""" if isinstance(result, failure.Failure): return diff --git a/octavia/controller/worker/tasks/compute_tasks.py b/octavia/controller/worker/tasks/compute_tasks.py index f8d3692ce8..da6cec3631 100644 --- a/octavia/controller/worker/tasks/compute_tasks.py +++ b/octavia/controller/worker/tasks/compute_tasks.py @@ -46,7 +46,7 @@ class BaseComputeTask(task.Task): class ComputeCreate(BaseComputeTask): """Create the compute instance for a new amphora.""" - def execute(self, amphora_id, config_drive_files=None): + def execute(self, amphora_id, ports=None, config_drive_files=None): """Create an amphora :returns: an amphora @@ -63,6 +63,7 @@ class ComputeCreate(BaseComputeTask): key_name=CONF.controller_worker.amp_ssh_key_name, sec_groups=CONF.controller_worker.amp_secgroup_list, network_ids=[CONF.controller_worker.amp_network], + port_ids=[port.id for port in ports] if ports else [], config_drive_files=config_drive_files) LOG.debug("Server created with id: %s for amphora id: %s" % @@ -94,7 +95,7 @@ class ComputeCreate(BaseComputeTask): class CertComputeCreate(ComputeCreate): - def execute(self, amphora_id, server_pem): + def execute(self, amphora_id, server_pem, ports=None): """Create an amphora :returns: an amphora @@ -106,8 +107,8 @@ class CertComputeCreate(ComputeCreate): # '/etc/octavia/octavia.conf' '/etc/octavia/certs/server.pem': server_pem, '/etc/octavia/certs/client_ca.pem': client_ca} - return super(CertComputeCreate, self).execute(amphora_id, - config_drive_files) + return super(CertComputeCreate, self).execute( + amphora_id, ports=ports, config_drive_files=config_drive_files) class DeleteAmphoraeOnLoadBalancer(BaseComputeTask): @@ -119,7 +120,7 @@ class DeleteAmphoraeOnLoadBalancer(BaseComputeTask): def execute(self, loadbalancer): for amp in loadbalancer.amphorae: try: - self.compute.delete(amphora_id=amp.compute_id) + self.compute.delete(compute_id=amp.compute_id) except Exception as e: LOG.error(_LE("Nova delete for amphora id: %(amp)s failed:" "%(exp)s"), {'amp': amp.id, 'exp': e}) @@ -128,14 +129,13 @@ class DeleteAmphoraeOnLoadBalancer(BaseComputeTask): class ComputeDelete(BaseComputeTask): def execute(self, amphora): - compute_id = amphora.compute_id - LOG.debug("Nova Delete execute for amphora with id %s" % compute_id) + LOG.debug("Nova Delete execute for amphora with id %s" % amphora.id) try: - self.compute.delete(amphora_id=compute_id) + self.compute.delete(compute_id=amphora.compute_id) except Exception as e: LOG.error(_LE("Nova delete for amphora id: %(amp)s failed:" - "%(exp)s"), {'amp': compute_id, 'exp': e}) + "%(exp)s"), {'amp': amphora.id, 'exp': e}) raise e diff --git a/octavia/controller/worker/tasks/database_tasks.py b/octavia/controller/worker/tasks/database_tasks.py index 4452d782fd..87e1d1ea58 100644 --- a/octavia/controller/worker/tasks/database_tasks.py +++ b/octavia/controller/worker/tasks/database_tasks.py @@ -20,6 +20,7 @@ from taskflow import task from taskflow.types import failure from octavia.common import constants +from octavia.common import data_models from octavia.common import exceptions from octavia.db import api as db_apis from octavia.db import repositories as repo @@ -257,6 +258,18 @@ class UpdateAmphoraVIPData(BaseDatabaseTask): ha_port_id=amp_data.ha_port_id) +class AssociateFailoverAmphoraWithLBID(BaseDatabaseTask): + + def execute(self, amphora_id, loadbalancer_id): + self.repos.amphora.associate(db_apis.get_session(), + load_balancer_id=loadbalancer_id, + amphora_id=amphora_id) + + def revert(self, amphora_id): + self.repos.amphora.update(db_apis.get_session(), amphora_id, + loadbalancer_id=None) + + class MapLoadbalancerToAmphora(BaseDatabaseTask): """Maps and assigns a load balancer to an amphora in the database.""" @@ -812,3 +825,34 @@ class UpdatePoolInDB(BaseDatabaseTask): # TODO(johnsom) fix this to set the upper ojects to ERROR self.pool_repo.update(db_apis.get_session(), pool.id, enabled=0) + + +class GetUpdatedFailoverAmpNetworkDetailsAsList(BaseDatabaseTask): + """Task to retrieve amphora network details.""" + + def execute(self, amphora_id, amphora): + amp_net_configs = [data_models.Amphora( + id=amphora_id, + vrrp_ip=amphora.vrrp_ip, + ha_ip=amphora.ha_ip, + vrrp_port_id=amphora.vrrp_port_id, + ha_port_id=amphora.ha_port_id)] + return amp_net_configs + + +class GetListenersFromLoadbalancer(BaseDatabaseTask): + """Task to pull the listener from a loadbalancer.""" + + def execute(self, loadbalancer): + listeners = [] + for listener in loadbalancer.listeners: + l = self.listener_repo.get(db_apis.get_session(), id=listener.id) + listeners.append(l) + return listeners + + +class GetVipFromLoadbalancer(BaseDatabaseTask): + """Task to pull the vip from a loadbalancer.""" + + def execute(self, loadbalancer): + return loadbalancer.vip \ No newline at end of file diff --git a/octavia/controller/worker/tasks/network_tasks.py b/octavia/controller/worker/tasks/network_tasks.py index 738e93b1ab..bbb1312e78 100644 --- a/octavia/controller/worker/tasks/network_tasks.py +++ b/octavia/controller/worker/tasks/network_tasks.py @@ -24,7 +24,7 @@ from taskflow.types import failure from octavia.common import constants from octavia.i18n import _LW, _LE from octavia.network import base -from octavia.network import data_models +from octavia.network import data_models as n_data_models LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -45,6 +45,50 @@ class BaseNetworkTask(task.Task): ).driver +class CalculateAmphoraDelta(BaseNetworkTask): + + default_provides = constants.DELTA + + def execute(self, loadbalancer, amphora): + LOG.debug("Calculating network delta for amphora id: %s" % amphora.id) + + # Figure out what networks we want + # seed with lb network(s) + subnet = self.network_driver.get_subnet(loadbalancer.vip.subnet_id) + desired_network_ids = {CONF.controller_worker.amp_network, + subnet.network_id} + + if not loadbalancer.listeners: + return None + + for listener in loadbalancer.listeners: + if (not listener.default_pool) or ( + not listener.default_pool.members): + continue + member_networks = [ + self.network_driver.get_subnet(member.subnet_id).network_id + for member in listener.default_pool.members + if member.subnet_id + ] + desired_network_ids.update(member_networks) + + nics = self.network_driver.get_plugged_networks(amphora.compute_id) + # assume we don't have two nics in the same network + actual_network_nics = dict((nic.network_id, nic) for nic in nics) + + del_ids = set(actual_network_nics) - desired_network_ids + delete_nics = list( + actual_network_nics[net_id] for net_id in del_ids) + + add_ids = desired_network_ids - set(actual_network_nics) + add_nics = list(n_data_models.Interface( + network_id=net_id) for net_id in add_ids) + delta = n_data_models.Delta( + amphora_id=amphora.id, compute_id=amphora.compute_id, + add_nics=add_nics, delete_nics=delete_nics) + return delta + + class CalculateDelta(BaseNetworkTask): """Task to calculate the delta between @@ -59,53 +103,17 @@ class CalculateDelta(BaseNetworkTask): """Compute which NICs need to be plugged for the amphora to become operational. - :param amphora - the amphora configuration we - want to achieve - :param nics - the nics on the real amphora - :returns the delta + :param loadbalancer: the loadbalancer to calculate deltas for all + amphorae + :returns: dict of octavia.network.data_models.Delta keyed off amphora + id """ + calculate_amp = CalculateAmphoraDelta() deltas = {} for amphora in loadbalancer.amphorae: - - LOG.debug("Calculating network delta for amphora id: %s" - % amphora.id) - - # Figure out what networks we want - # seed with lb network(s) - subnet = self.network_driver.get_subnet(loadbalancer.vip.subnet_id) - desired_network_ids = {CONF.controller_worker.amp_network, - subnet.network_id} - - if not loadbalancer.listeners: - return {} - - for listener in loadbalancer.listeners: - if (not listener.default_pool) or ( - not listener.default_pool.members): - continue - member_networks = [ - self.network_driver.get_subnet(member.subnet_id).network_id - for member in listener.default_pool.members - if member.subnet_id - ] - desired_network_ids.update(member_networks) - - nics = self.network_driver.get_plugged_networks(amphora.compute_id) - # assume we don't have two nics in the same network - actual_network_nics = dict((nic.network_id, nic) for nic in nics) - - del_ids = set(actual_network_nics) - desired_network_ids - delete_nics = list( - actual_network_nics[net_id] for net_id in del_ids) - - add_ids = desired_network_ids - set(actual_network_nics) - add_nics = list(data_models.Interface( - network_id=net_id) for net_id in add_ids) - deltas[amphora.id] = data_models.Delta( - amphora_id=amphora.id, compute_id=amphora.compute_id, - add_nics=add_nics, delete_nics=delete_nics) - + delta = calculate_amp.execute(loadbalancer, amphora) + deltas[amphora.id] = delta return deltas @@ -190,6 +198,25 @@ class UnPlugNetworks(BaseNetworkTask): pass # Todo(german) follow up if that makes sense +class GetMemberPorts(BaseNetworkTask): + + def execute(self, loadbalancer, amphora): + vip_port = self.network_driver.get_port(loadbalancer.vip.port_id) + member_ports = [] + interfaces = self.network_driver.get_plugged_networks( + amphora.compute_id) + for interface in interfaces: + port = self.network_driver.get_port(interface.port_id) + if vip_port.network_id == port.network_id: + continue + port.network = self.network_driver.get_network(port.network_id) + for fixed_ip in port.fixed_ips: + fixed_ip.subnet = self.network_driver.get_subnet( + fixed_ip.subnet_id) + member_ports.append(port) + return member_ports + + class HandleNetworkDeltas(BaseNetworkTask): """Task to plug and unplug networks @@ -275,9 +302,9 @@ class UnplugVIP(BaseNetworkTask): try: self.network_driver.unplug_vip(loadbalancer, loadbalancer.vip) except Exception as e: - LOG.error(_LE("Unable to unplug vip from load balancer %(id)s: " - "exception: %(ex)s"), id=loadbalancer.id, - ex=e.message) + LOG.error(_LE("Unable to unplug vip from load balancer" + "{id}: exception: {ex}").format(id=loadbalancer.id, + ex=e.message)) class AllocateVIP(BaseNetworkTask): @@ -334,21 +361,56 @@ class GetAmphoraeNetworkConfigs(BaseNetworkTask): vip_port = self.network_driver.get_port(loadbalancer.vip.port_id) amp_net_configs = {} for amp in loadbalancer.amphorae: - LOG.debug("Retrieving network details for " - "amphora {0}".format(amp.id)) - vrrp_port = self.network_driver.get_port(amp.vrrp_port_id) - vrrp_subnet = self.network_driver.get_subnet( - vrrp_port.get_subnet_id(amp.vrrp_ip)) - ha_port = self.network_driver.get_port(amp.ha_port_id) - ha_subnet = self.network_driver.get_subnet( - ha_port.get_subnet_id(amp.ha_ip)) - amp_net_configs[amp.id] = data_models.AmphoraNetworkConfig( - amphora=amp, - vip_subnet=vip_subnet, - vip_port=vip_port, - vrrp_subnet=vrrp_subnet, - vrrp_port=vrrp_port, - ha_subnet=ha_subnet, - ha_port=ha_port - ) + if amp.status != constants.DELETED: + LOG.debug("Retrieving network details for " + "amphora {0}".format(amp.id)) + vrrp_port = self.network_driver.get_port(amp.vrrp_port_id) + vrrp_subnet = self.network_driver.get_subnet( + vrrp_port.get_subnet_id(amp.vrrp_ip)) + ha_port = self.network_driver.get_port(amp.ha_port_id) + ha_subnet = self.network_driver.get_subnet( + ha_port.get_subnet_id(amp.ha_ip)) + amp_net_configs[amp.id] = n_data_models.AmphoraNetworkConfig( + amphora=amp, + vip_subnet=vip_subnet, + vip_port=vip_port, + vrrp_subnet=vrrp_subnet, + vrrp_port=vrrp_port, + ha_subnet=ha_subnet, + ha_port=ha_port + ) return amp_net_configs + + +class FailoverPreparationForAmphora(BaseNetworkTask): + """Task to prepare an amphora for failover.""" + + def execute(self, amphora): + LOG.debug("Prepare amphora %s for failover." % amphora.id) + + self.network_driver.failover_preparation(amphora) + + +class RetrievePortIDsOnAmphoraExceptLBNetwork(BaseNetworkTask): + """Task retrieving all the port ids on an amphora, except lb network.""" + + def execute(self, amphora): + LOG.debug("Retrieve all but the lb network port id on amphora %s." % + amphora.id) + + interfaces = self.network_driver.get_plugged_networks( + compute_id=amphora.compute_id) + + ports = [] + for interface_ in interfaces: + if interface_.port_id not in ports: + port = self.network_driver.get_port(port_id=interface_.port_id) + ips = port.fixed_ips + lb_network = False + for ip in ips: + if ip.ip_address == amphora.lb_network_ip: + lb_network = True + if not lb_network: + ports.append(port) + + return ports diff --git a/octavia/network/base.py b/octavia/network/base.py index 0ddfcf0644..aea411ca34 100644 --- a/octavia/network/base.py +++ b/octavia/network/base.py @@ -214,3 +214,13 @@ class AbstractNetworkDriver(object): :raises: NetworkException, PortNotFound """ pass + + @abc.abstractmethod + def failover_preparation(self, amphora): + """Prepare an amphora for failover. + + :param amphora: amphora object to failover + :return: None + :raises: PortNotFound + """ + pass diff --git a/octavia/network/drivers/neutron/allowed_address_pairs.py b/octavia/network/drivers/neutron/allowed_address_pairs.py index 4cc2327c4f..0662724137 100644 --- a/octavia/network/drivers/neutron/allowed_address_pairs.py +++ b/octavia/network/drivers/neutron/allowed_address_pairs.py @@ -272,6 +272,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver): msg = ('Amphora with compute id {compute_id} does not have any ' 'plugged networks').format(compute_id=compute_id) raise base.AmphoraNotFound(msg) + unpluggers = self._get_interfaces_to_unplug(interfaces, network_id, ip_address=ip_address) try: @@ -299,3 +300,24 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver): def update_vip(self, load_balancer): sec_grp = self._get_lb_security_group(load_balancer.id) self._update_security_group_rules(load_balancer, sec_grp.get('id')) + + def failover_preparation(self, amphora): + interfaces = self.get_plugged_networks(compute_id=amphora.compute_id) + + ports = [] + for interface_ in interfaces: + port = self.get_port(port_id=interface_.port_id) + ips = port.fixed_ips + lb_network = False + for ip in ips: + if ip.ip_address == amphora.lb_network_ip: + lb_network = True + if not lb_network: + ports.append(port) + + for port in ports: + try: + self.neutron_client.update_port(port.id, + {'port': {'device_id': ''}}) + except neutron_client_exceptions.NotFound: + raise base.PortNotFound() diff --git a/octavia/network/drivers/noop_driver/driver.py b/octavia/network/drivers/noop_driver/driver.py index e8f9f27832..c9f3854744 100644 --- a/octavia/network/drivers/noop_driver/driver.py +++ b/octavia/network/drivers/noop_driver/driver.py @@ -92,6 +92,10 @@ class NoopManager(object): self.__class__.__name__, port_id) self.networkconfigconfig[port_id] = (port_id, 'get_port') + def failover_preparation(self, amphora): + LOG.debug("failover %s no-op, failover_preparation, amphora id %s", + self.__class__.__name__, amphora.id) + class NoopNetworkDriver(driver_base.AbstractNetworkDriver): def __init__(self, region=None): @@ -131,3 +135,6 @@ class NoopNetworkDriver(driver_base.AbstractNetworkDriver): def get_port(self, port_id): self.driver.get_port(port_id) + + def failover_preparation(self, amphora): + self.driver.failover_preparation(amphora) diff --git a/octavia/tests/unit/compute/drivers/test_nova_driver.py b/octavia/tests/unit/compute/drivers/test_nova_driver.py index 72e35f2b90..251a0e0f50 100644 --- a/octavia/tests/unit/compute/drivers/test_nova_driver.py +++ b/octavia/tests/unit/compute/drivers/test_nova_driver.py @@ -67,6 +67,7 @@ class TestNovaClient(base.TestCase): key_name=1, sec_groups=1, network_ids=[1], + port_ids=[2], user_data='Blah', config_drive_files='Files Blah') @@ -74,7 +75,7 @@ class TestNovaClient(base.TestCase): self.manager.manager.create.assert_called_with( name="amphora_name", - nics=[{'net-id': 1}], + nics=[{'net-id': 1}, {'port-id': 2}], image=1, flavor=1, key_name=1, diff --git a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py index 030a46674d..cd4d4fadae 100644 --- a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py @@ -112,3 +112,24 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(len(amp_flow.provides), 0) self.assertEqual(len(amp_flow.requires), 1) + + def test_get_failover_flow(self): + + amp_flow = self.AmpFlow.get_failover_flow() + + self.assertIsInstance(amp_flow, flow.Flow) + + self.assertIn(constants.AMPHORA, amp_flow.requires) + self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) + self.assertIn(constants.FAILOVER_AMPHORA, amp_flow.provides) + self.assertIn(constants.AMPHORA, amp_flow.provides) + self.assertIn(constants.AMPHORA_ID, amp_flow.provides) + self.assertIn(constants.COMPUTE_ID, amp_flow.provides) + self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides) + self.assertIn(constants.AMPS_DATA, amp_flow.provides) + self.assertIn(constants.PORTS, amp_flow.provides) + self.assertIn(constants.LISTENERS, amp_flow.provides) + self.assertIn(constants.LOADBALANCER, amp_flow.provides) + + self.assertEqual(len(amp_flow.requires), 2) + self.assertEqual(len(amp_flow.provides), 12) diff --git a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py index d1a703a48f..96a62cbd6b 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py @@ -39,6 +39,8 @@ _vip_mock = mock.MagicMock() _LB_mock = mock.MagicMock() _amphorae_mock = [_amphora_mock] _network_mock = mock.MagicMock() +_port_mock = mock.MagicMock() +_ports_mock = [_port_mock] @mock.patch('octavia.db.repositories.AmphoraRepository.update') @@ -204,10 +206,10 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_post_network_plug_obj = (amphora_driver_tasks. AmphoraPostNetworkPlug()) - amphora_post_network_plug_obj.execute(_amphora_mock) + amphora_post_network_plug_obj.execute(_amphora_mock, _ports_mock) (mock_driver.post_network_plug. - assert_called_once_with)(_amphora_mock) + assert_called_once_with)(_amphora_mock, _port_mock) # Test revert amp = amphora_post_network_plug_obj.revert(None, _amphora_mock) diff --git a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py index 1f1ff6fcc0..52bd0d6c60 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py @@ -43,6 +43,7 @@ AMP_WAIT = 12 AMPHORA_ID = uuidutils.generate_uuid() COMPUTE_ID = uuidutils.generate_uuid() LB_NET_IP = '192.0.2.1' +PORT_ID = uuidutils.generate_uuid() AUTH_VERSION = '2' @@ -55,9 +56,12 @@ class TestException(Exception): return repr(self.value) _amphora_mock = mock.MagicMock() +_amphora_mock.id = AMPHORA_ID _amphora_mock.compute_id = COMPUTE_ID _load_balancer_mock = mock.MagicMock() _load_balancer_mock.amphorae = [_amphora_mock] +_port = mock.MagicMock() +_port.id = PORT_ID class TestComputeTasks(base.TestCase): @@ -86,7 +90,7 @@ class TestComputeTasks(base.TestCase): mock_driver.build.side_effect = [COMPUTE_ID, TestException('test')] # Test execute() - compute_id = createcompute.execute(_amphora_mock.id) + compute_id = createcompute.execute(_amphora_mock.id, ports=[_port]) # Validate that the build method was called properly mock_driver.build.assert_called_once_with( @@ -96,6 +100,7 @@ class TestComputeTasks(base.TestCase): key_name=AMP_SSH_KEY_NAME, sec_groups=AMP_SEC_GROUPS, network_ids=[AMP_NET], + port_ids=[PORT_ID], config_drive_files=None) # Make sure it returns the expected compute_id @@ -105,7 +110,7 @@ class TestComputeTasks(base.TestCase): createcompute = compute_tasks.ComputeCreate() self.assertRaises(TestException, createcompute.execute, - _amphora_mock, 'test_cert') + _amphora_mock, config_drive_files='test_cert') # Test revert() @@ -131,7 +136,8 @@ class TestComputeTasks(base.TestCase): m = mock.mock_open(read_data='test') with mock.patch('%s.open' % BUILTINS, m, create=True): # Test execute() - compute_id = createcompute.execute(_amphora_mock.id, 'test_cert') + compute_id = createcompute.execute(_amphora_mock.id, + 'test_cert') # Validate that the build method was called properly mock_driver.build.assert_called_once_with( @@ -141,6 +147,7 @@ class TestComputeTasks(base.TestCase): key_name=AMP_SSH_KEY_NAME, sec_groups=AMP_SEC_GROUPS, network_ids=[AMP_NET], + port_ids=[], config_drive_files={ '/etc/octavia/certs/server.pem': 'test_cert', '/etc/octavia/certs/client_ca.pem': m.return_value}) @@ -153,7 +160,7 @@ class TestComputeTasks(base.TestCase): createcompute = compute_tasks.ComputeCreate() self.assertRaises(TestException, createcompute.execute, - _amphora_mock, 'test_cert') + _amphora_mock, config_drive_files='test_cert') # Test revert() @@ -201,7 +208,7 @@ class TestComputeTasks(base.TestCase): delete_amps = compute_tasks.DeleteAmphoraeOnLoadBalancer() delete_amps.execute(_load_balancer_mock) - mock_driver.delete.assert_called_once_with(amphora_id=COMPUTE_ID) + mock_driver.delete.assert_called_once_with(compute_id=COMPUTE_ID) @mock.patch('stevedore.driver.DriverManager.driver') def test_compute_delete(self, mock_driver): @@ -209,4 +216,4 @@ class TestComputeTasks(base.TestCase): delete_compute = compute_tasks.ComputeDelete() delete_compute.execute(_amphora_mock) - mock_driver.delete.assert_called_once_with(amphora_id=COMPUTE_ID) + mock_driver.delete.assert_called_once_with(compute_id=COMPUTE_ID) diff --git a/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py index 35fb7b7a5b..cd7a3cbb76 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py @@ -38,6 +38,11 @@ IP_ADDRESS = "172.24.41.1" VIP = o_data_models.Vip(port_id=PORT_ID, subnet_id=SUBNET_ID, ip_address=IP_ADDRESS) LB = o_data_models.LoadBalancer(vip=VIP) +FIRST_IP = {"ip_address": IP_ADDRESS, "subnet_id": SUBNET_ID} +FIXED_IPS = [FIRST_IP] +INTERFACE = data_models.Interface(id=uuidutils.generate_uuid(), + compute_id=COMPUTE_ID, fixed_ips=FIXED_IPS, + port_id=PORT_ID) class TestException(Exception): @@ -85,7 +90,8 @@ class TestNetworkTasks(base.TestCase): self.amphora_mock.load_balancer = self.load_balancer_mock self.load_balancer_mock.amphorae = [self.amphora_mock] self.load_balancer_mock.listeners = None - self.assertEqual(EMPTY, net.execute(self.load_balancer_mock)) + self.assertEqual({self.amphora_mock.id: None}, + net.execute(self.load_balancer_mock)) listener_mock = mock.MagicMock() self.load_balancer_mock.listeners = [listener_mock] @@ -354,3 +360,10 @@ class TestNetworkTasks(base.TestCase): lb = o_data_models.LoadBalancer(vip=vip) net.execute(lb) mock_driver.deallocate_vip.assert_called_once_with(lb.vip) + + def test_failover_preparation_for_amphora(self, mock_driver): + failover = network_tasks.FailoverPreparationForAmphora() + amphora = o_data_models.Amphora(id=AMPHORA_ID, + lb_network_ip=IP_ADDRESS) + failover.execute(amphora) + mock_driver.failover_preparation.assert_called_once_with(amphora) diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index e5147bb6c2..4caed0fba2 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -32,6 +32,7 @@ LB_ID = uuidutils.generate_uuid() POOL_ID = uuidutils.generate_uuid() HM_ID = uuidutils.generate_uuid() MEMBER_ID = uuidutils.generate_uuid() +COMPUTE_ID = uuidutils.generate_uuid() HEALTH_UPDATE_DICT = {'delay': 1, 'timeout': 2} LISTENER_UPDATE_DICT = {'name': 'test', 'description': 'test2'} MEMBER_UPDATE_DICT = {'weight': 1, 'ip_address': '10.0.0.0'} @@ -46,6 +47,7 @@ _load_balancer_mock = mock.MagicMock() _member_mock = mock.MagicMock() _pool_mock = mock.MagicMock() _create_map_flow_mock = mock.MagicMock() +_amphora_mock.load_balancer_id = LB_ID @mock.patch('octavia.db.repositories.AmphoraRepository.get', @@ -633,3 +635,32 @@ class TestControllerWorker(base.TestCase): 'update_dict': POOL_UPDATE_DICT})) _flow_mock.run.assert_called_once_with() + + @mock.patch('octavia.controller.worker.flows.' + 'amphora_flows.AmphoraFlows.get_failover_flow', + return_value=_flow_mock) + def test_failover_amphora(self, + mock_get_update_listener_flow, + mock_api_get_session, + mock_dyn_log_listener, + mock_taskflow_load, + mock_pool_repo_get, + mock_member_repo_get, + mock_listener_repo_get, + mock_lb_repo_get, + mock_health_mon_repo_get, + mock_amp_repo_get): + + _flow_mock.reset_mock() + + cw = controller_worker.ControllerWorker() + cw.failover_amphora(AMP_ID) + + (base_taskflow.BaseTaskFlowEngine._taskflow_load. + assert_called_once_with( + _flow_mock, + store={constants.AMPHORA: _amphora_mock, + constants.LOADBALANCER_ID: + _amphora_mock.load_balancer_id})) + + _flow_mock.run.assert_called_once_with() \ No newline at end of file diff --git a/octavia/tests/unit/network/drivers/neutron/test_allowed_address_pairs.py b/octavia/tests/unit/network/drivers/neutron/test_allowed_address_pairs.py index a8b77d8f73..401c043d90 100644 --- a/octavia/tests/unit/network/drivers/neutron/test_allowed_address_pairs.py +++ b/octavia/tests/unit/network/drivers/neutron/test_allowed_address_pairs.py @@ -34,6 +34,23 @@ class TestAllowedAddressPairsDriver(base.TestCase): k_session = None driver = None + SUBNET_ID_1 = "5" + SUBNET_ID_2 = "8" + FIXED_IP_ID_1 = "6" + FIXED_IP_ID_2 = "8" + NETWORK_ID_1 = "7" + NETWORK_ID_2 = "10" + IP_ADDRESS_1 = "10.0.0.2" + IP_ADDRESS_2 = "12.0.0.2" + AMPHORA_ID = "1" + LB_ID = "2" + COMPUTE_ID = "3" + ACTIVE = "ACTIVE" + LB_NET_IP = "10.0.0.2" + LB_NET_PORT_ID = "6" + HA_PORT_ID = "8" + HA_IP = "12.0.0.2" + def setUp(self): super(TestAllowedAddressPairsDriver, self).setUp() with mock.patch('octavia.common.clients.neutron_client.Client', @@ -468,3 +485,34 @@ class TestAllowedAddressPairsDriver(base.TestCase): self.driver.update_vip(lb) delete_rule.assert_called_once_with('ssh-rule') self.assertFalse(create_rule.called) + + def test_failover_preparation(self): + ports = {"ports": [ + {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1, + "ip_address": self.IP_ADDRESS_1}], + "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1}, + {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2, + "ip_address": self.IP_ADDRESS_2}], + "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]} + self.driver.neutron_client.list_ports.return_value = ports + self.driver.neutron_client.show_port = mock.Mock( + side_effect=self._failover_show_port_side_effect) + port_update = self.driver.neutron_client.update_port + amphora = data_models.Amphora( + id=self.AMPHORA_ID, load_balancer_id=self.LB_ID, + compute_id=self.COMPUTE_ID, status=self.ACTIVE, + lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID, + ha_ip=self.HA_IP) + self.driver.failover_preparation(amphora) + port_update.assert_called_once_with(ports["ports"][1].get("id"), + {'port': {'device_id': ''}}) + + def _failover_show_port_side_effect(self, port_id): + if port_id == self.LB_NET_PORT_ID: + return {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1, + "ip_address": self.IP_ADDRESS_1}], + "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1} + if port_id == self.HA_PORT_ID: + return {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2, + "ip_address": self.IP_ADDRESS_2}], + "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2} diff --git a/specs/version0.5/controller-worker.rst b/specs/version0.5/controller-worker.rst index 26b9569e47..46551d3511 100644 --- a/specs/version0.5/controller-worker.rst +++ b/specs/version0.5/controller-worker.rst @@ -206,6 +206,15 @@ The Controller Worker library will provide the following methods: """ raise NotImplementedError + def failover_amphora(self, amphora_id): + """Failover an amphora + + :param amp_id: ID of the amphora to fail over + :returns: None + :raises AmphoraNotFound: The referenced Amphora was not found + """ + raise NotImplementedError + Alternatives ------------ This code could be included in the Queue Consumer component of the controller. diff --git a/specs/version0.5/network-driver-interface.rst b/specs/version0.5/network-driver-interface.rst index e75454e0da..721c922d4c 100644 --- a/specs/version0.5/network-driver-interface.rst +++ b/specs/version0.5/network-driver-interface.rst @@ -236,6 +236,13 @@ class AbstractNetworkDriver * returns = Port data model * raises NetworkException, PortNotFound +* failover_preparation(amphora): + + * Prepare an amphora for failover + * amphora = amphora data model + * returns = None + * raises PortNotFound + Alternatives ------------