Updates the failover flow for active/standby

This patch updates the failover flow to be active/standby aware.

The patch also updates loadbalancer.amphorae iterrators to check
if the amphora is in the "ALLOCATED" state so we don't try to
take action on amphora that are no longer in use.

Depends-On: I931d826690c925f022dbfffe9afb7bf41345b1d0
Change-Id: I0ff3f284c92b603fc33c1ff606b41b45b4d43022
Implements: blueprint activepassiveamphora
This commit is contained in:
Michael Johnson 2015-12-04 22:31:54 +00:00
parent 6f786fdad7
commit c7f88bb435
24 changed files with 514 additions and 143 deletions

View File

@ -18,7 +18,7 @@
{% macro peers_macro(constants,listener) %}
{% if listener.topology == constants.TOPOLOGY_ACTIVE_STANDBY %}
peers {{ "%s_peers"|format(listener.id.replace("-", ""))|trim() }}
{% for amp in listener.amphorae %}
{% for amp in listener.amphorae if amp.status == constants.AMPHORA_ALLOCATED %}
{# HAProxy has peer name limitations, thus the hash filter #}
peer {{ amp.id|hash_amp_id|replace('=', '') }} {{ amp.vrrp_ip }}:{{ listener.peer_port }}
{% endfor %}

View File

@ -16,6 +16,7 @@ import os
import jinja2
from oslo_config import cfg
import six
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants
@ -66,7 +67,9 @@ class KeepalivedJinjaTemplater(object):
# several backend services. To disable the fallback behavior, we need
# to add the "nopreempt" flag in the backup instance section.
peers_ips = []
for amp in loadbalancer.amphorae:
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
if amp.vrrp_ip != amphora.vrrp_ip:
peers_ips.append(amp.vrrp_ip)
return self.get_template(self.keepalived_template).render(

View File

@ -13,6 +13,7 @@
# under the License.
from oslo_log import log as logging
import six
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.amphorae.drivers.keepalived.jinja import jinja_cfg
@ -42,7 +43,9 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
LOG.debug("Update loadbalancer %s amphora VRRP configuration.",
loadbalancer.id)
for amp in loadbalancer.amphorae:
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(loadbalancer, amp)
self.client.upload_vrrp_config(amp, config)
@ -54,7 +57,11 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
"""
LOG.info(_LI("Stop loadbalancer %s amphora VRRP Service."),
loadbalancer.id)
for amp in loadbalancer.amphorae:
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
self.client.stop_vrrp(amp)
def start_vrrp_service(self, loadbalancer):
@ -64,7 +71,11 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
"""
LOG.info(_LI("Start loadbalancer %s amphora VRRP Service."),
loadbalancer.id)
for amp in loadbalancer.amphorae:
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
LOG.debug("Start VRRP Service on amphora %s .", amp.lb_network_ip)
self.client.start_vrrp(amp)
@ -75,5 +86,9 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
"""
LOG.info(_LI("Reload loadbalancer %s amphora VRRP Service."),
loadbalancer.id)
for amp in loadbalancer.amphorae:
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
self.client.reload_vrrp(amp)

View File

@ -76,6 +76,7 @@ SUPPORTED_AMPHORA_TYPES = (AMPHORA_VM,)
# Task/Flow constants
AMPHORA = 'amphora'
FAILED_AMPHORA = 'failed_amphora'
FAILOVER_AMPHORA = 'failover_amphora'
AMPHORAE = 'amphorae'
AMPHORA_ID = 'amphora_id'
@ -91,6 +92,7 @@ MEMBER = 'member'
MEMBER_ID = 'member_id'
COMPUTE_ID = 'compute_id'
COMPUTE_OBJ = 'compute_obj'
AMP_DATA = 'amp_data'
AMPS_DATA = 'amps_data'
NICS = 'nics'
VIP = 'vip'

View File

@ -25,9 +25,10 @@ from octavia.controller.worker.flows import member_flows
from octavia.controller.worker.flows import pool_flows
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.i18n import _LI
from octavia.i18n import _LE, _LI
from oslo_config import cfg
from oslo_utils import excutils
from taskflow.listeners import logging as tf_logging
CONF = cfg.CONF
@ -470,16 +471,20 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:raises AmphoraNotFound: The referenced amphora was not found
"""
amp = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
try:
amp = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(),
store={constants.AMPHORA: amp,
constants.LOADBALANCER_ID: amp.load_balancer_id})
with tf_logging.DynamicLoggingListener(failover_amphora_tf,
log=LOG):
failover_amphora_tf.run()
failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(role=amp.role),
store={constants.FAILED_AMPHORA: amp,
constants.LOADBALANCER_ID: amp.load_balancer_id})
with tf_logging.DynamicLoggingListener(failover_amphora_tf,
log=LOG):
failover_amphora_tf.run()
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failover exception: %s") % e)
def amphora_cert_rotation(self, amphora_id):
"""Perform cert rotation for an amphora.

View File

@ -223,10 +223,12 @@ class AmphoraFlows(object):
# Setup the decider for the path if we can map an amphora
amp_for_lb_flow.link(allocate_and_associate_amp, map_lb_to_amp,
decider=self._allocate_amp_to_lb_decider)
decider=self._allocate_amp_to_lb_decider,
decider_depth='flow')
# Setup the decider for the path if we can't map an amphora
amp_for_lb_flow.link(allocate_and_associate_amp, create_amp,
decider=self._create_new_amp_for_lb_decider)
decider=self._create_new_amp_for_lb_decider,
decider_depth='flow')
return amp_for_lb_flow
@ -255,7 +257,7 @@ class AmphoraFlows(object):
requires=constants.AMPHORA))
return delete_amphora_flow
def get_failover_flow(self):
def get_failover_flow(self, role=constants.ROLE_STANDALONE):
"""Creates a flow to failover a stale amphora
:returns: The flow for amphora failover
@ -265,61 +267,53 @@ class AmphoraFlows(object):
constants.FAILOVER_AMPHORA_FLOW)
failover_amphora_flow.add(
network_tasks.RetrievePortIDsOnAmphoraExceptLBNetwork(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA, provides=constants.PORTS))
failover_amphora_flow.add(network_tasks.FailoverPreparationForAmphora(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.
MarkAmphoraPendingDeleteInDB(
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.
MarkAmphoraHealthBusy(
requires=constants.AMPHORA))
# Delete the old amphora
failover_amphora_flow.add(
database_tasks.MarkAmphoraPendingDeleteInDB(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(
database_tasks.MarkAmphoraHealthBusy(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(compute_tasks.ComputeDelete(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.
DisableAmphoraHealthMonitoring(
requires=constants.AMPHORA))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.CreateAmphoraInDB(
provides=constants.AMPHORA_ID))
# Save failed amphora details for later
failover_amphora_flow.add(
database_tasks.GetUpdatedFailoverAmpNetworkDetailsAsList(
requires=(constants.AMPHORA_ID, constants.AMPHORA),
provides=constants.AMPS_DATA))
if self.REST_AMPHORA_DRIVER:
failover_amphora_flow.add(cert_task.GenerateServerPEMTask(
provides=constants.SERVER_PEM))
failover_amphora_flow.add(compute_tasks.CertComputeCreate(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM),
provides=constants.COMPUTE_ID))
else:
failover_amphora_flow.add(compute_tasks.ComputeCreate(
requires=constants.AMPHORA_ID, provides=constants.COMPUTE_ID))
failover_amphora_flow.add(database_tasks.UpdateAmphoraComputeId(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
failover_amphora_flow.add(
database_tasks.AssociateFailoverAmphoraWithLBID(
requires=(constants.AMPHORA_ID, constants.LOADBALANCER_ID)))
failover_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
failover_amphora_flow.add(compute_tasks.ComputeWait(
requires=constants.COMPUTE_ID,
provides=constants.COMPUTE_OBJ))
failover_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.FAILOVER_AMPHORA))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraFinalize(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.UpdateAmphoraVIPData(
requires=constants.AMPS_DATA))
database_tasks.GetAmphoraDetails(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA,
provides=constants.AMP_DATA))
# Get a new amphora
# Note: Role doesn't matter here. We will update it later.
get_amp_subflow = self.get_amphora_for_lb_subflow(
prefix=constants.FAILOVER_AMPHORA_FLOW)
failover_amphora_flow.add(get_amp_subflow)
# Update the new amphora with the failed amphora details
failover_amphora_flow.add(database_tasks.UpdateAmpFailoverDetails(
requires=(constants.AMPHORA, constants.AMP_DATA)))
failover_amphora_flow.add(database_tasks.ReloadLoadBalancer(
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
failover_amphora_flow.add(network_tasks.GetAmphoraeNetworkConfigs(
requires=constants.LOADBALANCER,
provides=constants.AMPHORAE_NETWORK_CONFIG))
@ -334,22 +328,56 @@ class AmphoraFlows(object):
constants.AMPHORAE_NETWORK_CONFIG)))
failover_amphora_flow.add(
network_tasks.GetMemberPorts(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA},
requires=(constants.LOADBALANCER, constants.AMPHORA),
provides=constants.MEMBER_PORTS
))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostNetworkPlug(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA,
constants.PORTS: constants.MEMBER_PORTS},
rebind={constants.PORTS: constants.MEMBER_PORTS},
requires=(constants.AMPHORA, constants.PORTS)))
# Handle the amphora role and VRRP if necessary
if role == constants.ROLE_MASTER:
failover_amphora_flow.add(database_tasks.MarkAmphoraMasterInDB(
name=constants.MARK_AMP_MASTER_INDB,
requires=constants.AMPHORA))
vrrp_subflow = self.get_vrrp_subflow(role)
failover_amphora_flow.add(vrrp_subflow)
elif role == constants.ROLE_BACKUP:
failover_amphora_flow.add(database_tasks.MarkAmphoraBackupInDB(
name=constants.MARK_AMP_BACKUP_INDB,
requires=constants.AMPHORA))
vrrp_subflow = self.get_vrrp_subflow(role)
failover_amphora_flow.add(vrrp_subflow)
elif role == constants.ROLE_STANDALONE:
failover_amphora_flow.add(
database_tasks.MarkAmphoraStandAloneInDB(
name=constants.MARK_AMP_STANDALONE_INDB,
requires=constants.AMPHORA))
failover_amphora_flow.add(amphora_driver_tasks.ListenersStart(
requires=(constants.LISTENERS, constants.VIP)))
failover_amphora_flow.add(database_tasks.MarkAmphoraAllocatedInDB(
rebind={constants.AMPHORA: constants.FAILOVER_AMPHORA},
requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
return failover_amphora_flow
def get_vrrp_subflow(self, prefix):
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
vrrp_subflow = linear_flow.Flow(sf_name)
vrrp_subflow.add(amphora_driver_tasks.AmphoraUpdateVRRPInterface(
name=sf_name + '-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.LOADBALANCER,
provides=constants.LOADBALANCER))
vrrp_subflow.add(database_tasks.CreateVRRPGroupForLB(
name=sf_name + '-' + constants.CREATE_VRRP_GROUP_FOR_LB,
requires=constants.LOADBALANCER,
provides=constants.LOADBALANCER))
vrrp_subflow.add(amphora_driver_tasks.AmphoraVRRPUpdate(
name=sf_name + '-' + constants.AMP_VRRP_UPDATE,
requires=constants.LOADBALANCER))
vrrp_subflow.add(amphora_driver_tasks.AmphoraVRRPStart(
name=sf_name + '-' + constants.AMP_VRRP_START,
requires=constants.LOADBALANCER))
return vrrp_subflow
def cert_rotate_amphora_flow(self):
"""Implement rotation for amphora's cert.

View File

@ -93,7 +93,7 @@ class LoadBalancerFlows(object):
post_create_LB_flow.add(new_LB_net_subflow)
if topology == constants.TOPOLOGY_ACTIVE_STANDBY:
vrrp_subflow = self._get_vrrp_subflow(prefix)
vrrp_subflow = self.amp_flows.get_vrrp_subflow(prefix)
post_create_LB_flow.add(vrrp_subflow)
post_create_LB_flow.add(database_tasks.UpdateLoadbalancerInDB(
@ -103,25 +103,6 @@ class LoadBalancerFlows(object):
requires=constants.LOADBALANCER))
return post_create_LB_flow
def _get_vrrp_subflow(self, prefix):
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
vrrp_subflow = linear_flow.Flow(sf_name)
vrrp_subflow.add(amphora_driver_tasks.AmphoraUpdateVRRPInterface(
name=sf_name + '-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.LOADBALANCER,
provides=constants.LOADBALANCER))
vrrp_subflow.add(database_tasks.CreateVRRPGroupForLB(
name=sf_name + '-' + constants.CREATE_VRRP_GROUP_FOR_LB,
requires=constants.LOADBALANCER,
provides=constants.LOADBALANCER))
vrrp_subflow.add(amphora_driver_tasks.AmphoraVRRPUpdate(
name=sf_name + '-' + constants.AMP_VRRP_UPDATE,
requires=constants.LOADBALANCER))
vrrp_subflow.add(amphora_driver_tasks.AmphoraVRRPStart(
name=sf_name + '-' + constants.AMP_VRRP_START,
requires=constants.LOADBALANCER))
return vrrp_subflow
def get_delete_load_balancer_flow(self):
"""Creates a flow to delete a load balancer.

View File

@ -16,6 +16,7 @@
import logging
from oslo_config import cfg
import six
from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
@ -222,7 +223,10 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Reverting post network plug."))
for amphora in loadbalancer.amphorae:
for amphora in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
self.amphora_repo.update(db_apis.get_session(), id=amphora.id,
status=constants.ERROR)
@ -261,13 +265,15 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
def execute(self, loadbalancer):
"""Execute post_vip_routine."""
amps = []
for amp in loadbalancer.amphorae:
# Currently this is supported only with REST Driver
interface = self.amphora_driver.get_vrrp_interface(amp)
self.amphora_repo.update(db_apis.get_session(), amp.id,
vrrp_interface=interface)
amps.append(self.amphora_repo.get(db_apis.get_session(),
id=amp.id))
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
# Currently this is supported only with REST Driver
interface = self.amphora_driver.get_vrrp_interface(amp)
self.amphora_repo.update(db_apis.get_session(), amp.id,
vrrp_interface=interface)
amps.append(self.amphora_repo.get(db_apis.get_session(),
id=amp.id))
loadbalancer.amphorae = amps
return loadbalancer
@ -276,7 +282,10 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Reverting Get Amphora VRRP Interface."))
for amp in loadbalancer.amphorae:
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
self.amphora_repo.update(db_apis.get_session(), amp.id,
vrrp_interface=None)

View File

@ -17,6 +17,7 @@ import logging
import time
from oslo_config import cfg
import six
from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
@ -123,7 +124,10 @@ class DeleteAmphoraeOnLoadBalancer(BaseComputeTask):
"""
def execute(self, loadbalancer):
for amp in loadbalancer.amphorae:
for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
try:
self.compute.delete(amp.compute_id)
except Exception:

View File

@ -16,6 +16,7 @@
import logging
from oslo_config import cfg
from oslo_db import exception as odb_exceptions
from oslo_utils import uuidutils
import sqlalchemy
from taskflow import task
@ -297,6 +298,25 @@ class UpdateAmphoraVIPData(BaseDatabaseTask):
vrrp_id=1)
class UpdateAmpFailoverDetails(BaseDatabaseTask):
"""Update amphora failover details in the database."""
def execute(self, amphora, amp_data):
"""Update amphora failover details in the database.
:param loadbalancer_id: The load balancer ID to lookup
:param mps_data: The load balancer ID to lookup
"""
self.repos.amphora.update(db_apis.get_session(), amphora.id,
vrrp_ip=amp_data.vrrp_ip,
ha_ip=amp_data.ha_ip,
vrrp_port_id=amp_data.vrrp_port_id,
ha_port_id=amp_data.ha_port_id,
role=amp_data.role,
vrrp_id=amp_data.vrrp_id,
vrrp_priority=amp_data.vrrp_priority)
class AssociateFailoverAmphoraWithLBID(BaseDatabaseTask):
def execute(self, amphora_id, loadbalancer_id):
@ -464,8 +484,8 @@ class MarkAmphoraDeletedInDB(BaseDatabaseTask):
"""Mark the amphora as pending delete in DB."""
LOG.debug("Mark DELETED in DB for amphora: %(amp)s with "
"compute id %(id)s",
{'amp': amphora.id, 'id': amphora.compute_id})
"compute id %(comp)s",
{'amp': amphora.id, 'comp': amphora.compute_id})
self.amphora_repo.update(db_apis.get_session(), amphora.id,
status=constants.DELETED)
@ -939,17 +959,18 @@ class UpdatePoolInDB(BaseDatabaseTask):
pool.id, {'enabled': 0}, None)
class GetUpdatedFailoverAmpNetworkDetailsAsList(BaseDatabaseTask):
class GetAmphoraDetails(BaseDatabaseTask):
"""Task to retrieve amphora network details."""
def execute(self, amphora_id, amphora):
amp_net_configs = [data_models.Amphora(
id=amphora_id,
vrrp_ip=amphora.vrrp_ip,
ha_ip=amphora.ha_ip,
vrrp_port_id=amphora.vrrp_port_id,
ha_port_id=amphora.ha_port_id)]
return amp_net_configs
def execute(self, amphora):
return data_models.Amphora(id=amphora.id,
vrrp_ip=amphora.vrrp_ip,
ha_ip=amphora.ha_ip,
vrrp_port_id=amphora.vrrp_port_id,
ha_port_id=amphora.ha_port_id,
role=amphora.role,
vrrp_id=amphora.vrrp_id,
vrrp_priority=amphora.vrrp_priority)
class GetListenersFromLoadbalancer(BaseDatabaseTask):
@ -973,13 +994,17 @@ class GetVipFromLoadbalancer(BaseDatabaseTask):
class CreateVRRPGroupForLB(BaseDatabaseTask):
def execute(self, loadbalancer):
loadbalancer.vrrp_group = self.repos.vrrpgroup.create(
db_apis.get_session(),
load_balancer_id=loadbalancer.id,
vrrp_group_name=str(loadbalancer.id).replace('-', ''),
vrrp_auth_type=constants.VRRP_AUTH_DEFAULT,
vrrp_auth_pass=uuidutils.generate_uuid().replace('-', '')[0:7],
advert_int=CONF.keepalived_vrrp.vrrp_advert_int)
try:
loadbalancer.vrrp_group = self.repos.vrrpgroup.create(
db_apis.get_session(),
load_balancer_id=loadbalancer.id,
vrrp_group_name=str(loadbalancer.id).replace('-', ''),
vrrp_auth_type=constants.VRRP_AUTH_DEFAULT,
vrrp_auth_pass=uuidutils.generate_uuid().replace('-', '')[0:7],
advert_int=CONF.keepalived_vrrp.vrrp_advert_int)
except odb_exceptions.DBDuplicateEntry:
LOG.debug('VRRP_GROUP entry already exists for load balancer, '
'skipping create.')
return loadbalancer

View File

@ -110,7 +110,10 @@ class CalculateDelta(BaseNetworkTask):
calculate_amp = CalculateAmphoraDelta()
deltas = {}
for amphora in loadbalancer.amphorae:
for amphora in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
delta = calculate_amp.execute(loadbalancer, amphora)
deltas[amphora.id] = delta
return deltas

View File

@ -18,6 +18,7 @@ from neutronclient.common import exceptions as neutron_client_exceptions
from novaclient import exceptions as nova_client_exceptions
from oslo_config import cfg
from oslo_log import log as logging
import six
from octavia.common import clients
from octavia.common import constants
@ -198,6 +199,18 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
raise base.DeallocateVIPException(message)
def deallocate_vip(self, vip):
# Delete the vrrp_port (instance port) in case nova didn't
# This can happen if a failover has occurred.
try:
for amphora in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
vip.load_balancer.amphorae):
self.neutron_client.delete_port(amphora.vrrp_port_id)
except base.PortNotFound:
LOG.debug('VIP instance port {0} already deleted. '
'Skipping.'.format(amphora.vrrp_port_id))
try:
port = self.get_port(vip.port_id)
except base.PortNotFound:
@ -238,7 +251,10 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
self._update_vip_security_group(load_balancer, vip)
plugged_amphorae = []
subnet = self.get_subnet(vip.subnet_id)
for amphora in load_balancer.amphorae:
for amphora in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
load_balancer.amphorae):
interface = self._get_plugged_interface(amphora.compute_id,
subnet.network_id)
if not interface:
@ -300,7 +316,10 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
msg = ("Can't unplug vip because vip subnet {0} was not "
"found").format(vip.subnet_id)
raise base.PluggedVIPNotFound(msg)
for amphora in load_balancer.amphorae:
for amphora in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
load_balancer.amphorae):
interface = self._get_plugged_interface(amphora.compute_id,
subnet.network_id)
if not interface:

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from octavia.common import constants
from octavia.common import data_models
@ -38,6 +39,8 @@ def generate_load_balancer(vip=None, amphorae=None):
for amp in lb.amphorae:
amp.load_balancer = lb
amp.load_balancer_id = lb.id
amp.status = constants.AMPHORA_ALLOCATED
amp.vrrp_port_id = 'vrrp_port-{0}-id'.format(VIP_SEED)
if vip:
vip.load_balancer = lb
vip.load_balancer_id = lb.id
@ -73,4 +76,4 @@ def generate_amphora(load_balancer=None):
load_balancer=load_balancer)
if load_balancer:
amp.load_balancer_id = load_balancer.id
return amp
return amp

View File

@ -28,6 +28,7 @@ class TestVRRPRestDriver(base.TestCase):
self.templater = jinja_cfg.KeepalivedJinjaTemplater()
self.amphora1 = mock.MagicMock()
self.amphora1.status = constants.AMPHORA_ALLOCATED
self.amphora1.vrrp_ip = '10.0.0.1'
self.amphora1.role = constants.ROLE_MASTER
self.amphora1.vrrp_interface = 'eth1'
@ -35,6 +36,7 @@ class TestVRRPRestDriver(base.TestCase):
self.amphora1.vrrp_priority = 100
self.amphora2 = mock.MagicMock()
self.amphora2.status = constants.AMPHORA_ALLOCATED
self.amphora2.vrrp_ip = '10.0.0.2'
self.amphora2.role = constants.ROLE_BACKUP
self.amphora2.vrrp_interface = 'eth1'

View File

@ -16,6 +16,7 @@
import mock
from octavia.amphorae.drivers.keepalived import vrrp_rest_driver
from octavia.common import constants
import octavia.tests.unit.base as base
@ -28,6 +29,7 @@ class TestVRRPRestDriver(base.TestCase):
self.FAKE_CONFIG = 'FAKE CONFIG'
self.lb_mock = mock.MagicMock()
self.amphora_mock = mock.MagicMock()
self.amphora_mock.status = constants.AMPHORA_ALLOCATED
self.lb_mock.amphorae = [self.amphora_mock]
super(TestVRRPRestDriver, self).setUp()

View File

@ -147,6 +147,27 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
def test_get_cert_bogus_create_amphora_for_lb_flow(self):
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker')
self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow(
'SOMEPREFIX', 'BOGUS_ROLE')
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.SERVER_PEM, amp_flow.provides)
self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
def test_get_delete_amphora_flow(self):
amp_flow = self.AmpFlow.get_delete_amphora_flow()
@ -182,20 +203,69 @@ class TestAmphoraFlows(base.TestCase):
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertIn(constants.FAILED_AMPHORA, amp_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.FAILOVER_AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.AMPS_DATA, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(12, len(amp_flow.provides))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(role=constants.ROLE_MASTER)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.FAILED_AMPHORA, amp_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(role=constants.ROLE_BACKUP)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.FAILED_AMPHORA, amp_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(role='BOGUSROLE')
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.FAILED_AMPHORA, amp_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(11, len(amp_flow.provides))
def test_cert_rotate_amphora_flow(self):
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
@ -210,3 +280,63 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(1, len(amp_rotate_flow.provides))
self.assertEqual(2, len(amp_rotate_flow.requires))
def test_get_vrrp_subflow(self):
vrrp_subflow = self.AmpFlow.get_vrrp_subflow('123')
self.assertIsInstance(vrrp_subflow, flow.Flow)
self.assertIn(constants.LOADBALANCER, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER, vrrp_subflow.requires)
self.assertEqual(1, len(vrrp_subflow.provides))
self.assertEqual(1, len(vrrp_subflow.requires))
def test_get_post_map_lb_subflow(self):
self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', constants.ROLE_MASTER)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertEqual(1, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', constants.ROLE_BACKUP)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertEqual(1, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', constants.ROLE_STANDALONE)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertEqual(1, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', 'BOGUS_ROLE')
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.AMPHORA_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertEqual(1, len(amp_flow.provides))
self.assertEqual(1, len(amp_flow.requires))

View File

@ -117,15 +117,3 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(4, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
def test_get_vrrp_subflow(self):
vrrp_subflow = self.LBFlow._get_vrrp_subflow('123')
self.assertIsInstance(vrrp_subflow, flow.Flow)
self.assertIn(constants.LOADBALANCER, vrrp_subflow.provides)
self.assertIn(constants.LOADBALANCER, vrrp_subflow.requires)
self.assertEqual(1, len(vrrp_subflow.provides))
self.assertEqual(1, len(vrrp_subflow.requires))

View File

@ -30,6 +30,7 @@ LB_ID = uuidutils.generate_uuid()
_amphora_mock = mock.MagicMock()
_amphora_mock.id = AMP_ID
_amphora_mock.status = constants.AMPHORA_ALLOCATED
_listener_mock = mock.MagicMock()
_listener_mock.id = LISTENER_ID
_load_balancer_mock = mock.MagicMock()

View File

@ -72,6 +72,7 @@ class TestComputeTasks(base.TestCase):
conf.config(group="keystone_authtoken", auth_version=AUTH_VERSION)
_amphora_mock.id = AMPHORA_ID
_amphora_mock.status = constants.AMPHORA_ALLOCATED
logging_mock = mock.MagicMock()
compute_tasks.LOG = logging_mock

View File

@ -13,7 +13,10 @@
# under the License.
#
import random
import mock
from oslo_db import exception as odb_exceptions
from oslo_utils import uuidutils
from taskflow.types import failure
@ -30,11 +33,29 @@ LB_NET_IP = '192.0.2.2'
LISTENER_ID = uuidutils.generate_uuid()
POOL_ID = uuidutils.generate_uuid()
MEMBER_ID = uuidutils.generate_uuid()
PORT_ID = uuidutils.generate_uuid()
SUBNET_ID = uuidutils.generate_uuid()
VRRP_PORT_ID = uuidutils.generate_uuid()
HA_PORT_ID = uuidutils.generate_uuid()
VIP_IP = '192.0.5.2'
VRRP_IP = '192.0.5.3'
HA_IP = '192.0.5.4'
AMP_ROLE = 'FAKE_ROLE'
VRRP_ID = random.randrange(255)
VRRP_PRIORITY = random.randrange(100)
_amphora_mock = mock.MagicMock()
_amphora_mock.id = AMP_ID
_amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.lb_network_ip = LB_NET_IP
_amphora_mock.vrrp_ip = VRRP_IP
_amphora_mock.ha_ip = HA_IP
_amphora_mock.ha_port_id = HA_PORT_ID
_amphora_mock.vrrp_port_id = VRRP_PORT_ID
_amphora_mock.role = AMP_ROLE
_amphora_mock.vrrp_id = VRRP_ID
_amphora_mock.vrrp_priority = VRRP_PRIORITY
_amphorae = [_amphora_mock]
_loadbalancer_mock = mock.MagicMock()
_loadbalancer_mock.id = LB_ID
_loadbalancer_mock.amphorae = [_amphora_mock]
@ -43,6 +64,11 @@ _pool_mock.id = POOL_ID
_listener_mock = mock.MagicMock()
_listener_mock.id = LISTENER_ID
_tf_failure_mock = mock.Mock(spec=failure.Failure)
_vip_mock = mock.MagicMock()
_vip_mock.port_id = PORT_ID
_vip_mock.subnet_id = SUBNET_ID
_vip_mock.ip_address = VIP_IP
_vrrp_group_mock = mock.MagicMock()
_cert_mock = mock.MagicMock()
_pem_mock = """Junk
-----BEGIN CERTIFICATE-----
@ -317,6 +343,102 @@ class TestDatabaseTasks(base.TestCase):
self.assertEqual(_listener_mock, listener)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get',
return_value=_loadbalancer_mock)
@mock.patch('octavia.db.repositories.VipRepository.update')
def test_update_vip_after_allocation(self,
mock_vip_update,
mock_loadbalancer_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_vip = database_tasks.UpdateVIPAfterAllocation()
loadbalancer = update_vip.execute(LB_ID, _vip_mock)
self.assertEqual(_loadbalancer_mock, loadbalancer)
mock_vip_update.assert_called_once_with('TEST',
LB_ID,
port_id=PORT_ID,
subnet_id=SUBNET_ID,
ip_address=VIP_IP)
mock_loadbalancer_get.assert_called_once_with('TEST',
id=LB_ID)
def test_update_amphora_vip_data(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_amp_vip_data = database_tasks.UpdateAmphoraVIPData()
update_amp_vip_data.execute(_amphorae)
mock_amphora_repo_update.assert_called_once_with(
'TEST',
AMP_ID,
vrrp_ip=VRRP_IP,
ha_ip=HA_IP,
vrrp_port_id=VRRP_PORT_ID,
ha_port_id=HA_PORT_ID,
vrrp_id=1)
def test_update_amp_failover_details(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_amp_fo_details = database_tasks.UpdateAmpFailoverDetails()
update_amp_fo_details.execute(_amphora_mock, _amphora_mock)
mock_amphora_repo_update.assert_called_once_with(
'TEST',
AMP_ID,
vrrp_ip=VRRP_IP,
ha_ip=HA_IP,
vrrp_port_id=VRRP_PORT_ID,
ha_port_id=HA_PORT_ID,
role=AMP_ROLE,
vrrp_id=VRRP_ID,
vrrp_priority=VRRP_PRIORITY)
@mock.patch('octavia.db.repositories.AmphoraRepository.associate')
def test_associate_failover_amphora_with_lb_id(
self,
mock_associate,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
assoc_fo_amp_lb_id = database_tasks.AssociateFailoverAmphoraWithLBID()
assoc_fo_amp_lb_id.execute(AMP_ID, LB_ID)
mock_associate.assert_called_once_with('TEST',
load_balancer_id=LB_ID,
amphora_id=AMP_ID)
# Test revert
assoc_fo_amp_lb_id.revert(AMP_ID)
mock_amphora_repo_update.assert_called_once_with('TEST',
AMP_ID,
loadbalancer_id=None)
@mock.patch('octavia.db.repositories.AmphoraRepository.'
'allocate_and_associate',
side_effect=[_amphora_mock, None])
@ -945,6 +1067,26 @@ class TestDatabaseTasks(base.TestCase):
POOL_ID,
{'enabled': 0}, None)
def test_get_amphora_details(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
get_amp_details = database_tasks.GetAmphoraDetails()
new_amp = get_amp_details.execute(_amphora_mock)
self.assertEqual(AMP_ID, new_amp.id)
self.assertEqual(VRRP_IP, new_amp.vrrp_ip)
self.assertEqual(HA_IP, new_amp.ha_ip)
self.assertEqual(VRRP_PORT_ID, new_amp.vrrp_port_id)
self.assertEqual(AMP_ROLE, new_amp.role)
self.assertEqual(VRRP_ID, new_amp.vrrp_id)
self.assertEqual(VRRP_PRIORITY, new_amp.vrrp_priority)
def test_mark_amphora_role_indb(self,
mock_generate_uuid,
mock_LOG,
@ -1011,6 +1153,8 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mock_get_session.side_effect = ['TEST',
odb_exceptions.DBDuplicateEntry]
create_vrrp_group = database_tasks.CreateVRRPGroupForLB()
create_vrrp_group.execute(_loadbalancer_mock)
mock_vrrp_group_create.assert_called_once_with(
@ -1020,6 +1164,7 @@ class TestDatabaseTasks(base.TestCase):
vrrp_auth_pass=mock_generate_uuid.return_value.replace('-',
'')[0:7],
advert_int=1)
create_vrrp_group.execute(_loadbalancer_mock)
@mock.patch('octavia.db.repositories.ListenerRepository.get')
def test_allocate_listener_peer_port(self,

View File

@ -63,6 +63,7 @@ class TestNetworkTasks(base.TestCase):
self.load_balancer_mock.amphorae = []
self.amphora_mock.id = AMPHORA_ID
self.amphora_mock.compute_id = COMPUTE_ID
self.amphora_mock.status = constants.AMPHORA_ALLOCATED
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="controller_worker", amp_network='netid')

View File

@ -667,7 +667,7 @@ class TestControllerWorker(base.TestCase):
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.AMPHORA: _amphora_mock,
store={constants.FAILED_AMPHORA: _amphora_mock,
constants.LOADBALANCER_ID:
_amphora_mock.load_balancer_id}))

View File

@ -114,21 +114,25 @@ class TestAllowedAddressPairsDriver(base.TestCase):
}
list_security_groups.return_value = security_groups
self.driver.deallocate_vip(vip)
delete_port.assert_called_once_with(vip.port_id)
delete_port.assert_called_with(vip.port_id)
delete_sec_grp.assert_called_once_with(sec_grp_id)
def test_deallocate_vip_when_delete_port_fails(self):
lb = dmh.generate_load_balancer_tree()
vip = data_models.Vip(port_id='1')
vip.load_balancer = lb
show_port = self.driver.neutron_client.show_port
show_port.return_value = {'port': {
'device_owner': allowed_address_pairs.OCTAVIA_OWNER}}
delete_port = self.driver.neutron_client.delete_port
delete_port.side_effect = TypeError
delete_port.side_effect = [None, None, TypeError]
self.assertRaises(network_base.DeallocateVIPException,
self.driver.deallocate_vip, vip)
def test_deallocate_vip_when_port_not_found(self):
lb = dmh.generate_load_balancer_tree()
vip = data_models.Vip(port_id='1')
vip.load_balancer = lb
show_port = self.driver.neutron_client.show_port
show_port.side_effect = neutron_exceptions.PortNotFoundClient
self.assertRaises(network_base.VIPConfigurationNotFound,
@ -144,7 +148,6 @@ class TestAllowedAddressPairsDriver(base.TestCase):
'id': vip.port_id,
'device_owner': 'neutron:LOADBALANCERV2',
'security_groups': [sec_grp_id]}}
delete_port = self.driver.neutron_client.delete_port
update_port = self.driver.neutron_client.update_port
delete_sec_grp = self.driver.neutron_client.delete_security_group
list_security_groups = self.driver.neutron_client.list_security_groups
@ -158,10 +161,11 @@ class TestAllowedAddressPairsDriver(base.TestCase):
expected_port_update = {'port': {'security_groups': []}}
update_port.assert_called_once_with(vip.port_id, expected_port_update)
delete_sec_grp.assert_called_once_with(sec_grp_id)
self.assertFalse(delete_port.called)
def test_deallocate_vip_when_vip_port_not_found(self):
lb = dmh.generate_load_balancer_tree()
vip = data_models.Vip(port_id='1')
vip.load_balancer = lb
admin_project_id = 'octavia'
session_mock = mock.MagicMock()
session_mock.get_project_id.return_value = admin_project_id

View File

@ -33,7 +33,7 @@ pyOpenSSL>=0.14 # Apache-2.0
WSME>=0.8 # MIT
Jinja2>=2.8 # BSD License (3 clause)
paramiko>=1.13.0 # LGPL
taskflow>=1.25.0 # Apache-2.0
taskflow>=1.26.0 # Apache-2.0
#for the amphora api
Flask<1.0,>=0.10 # BSD