Fixed various issues with create lb and amphora flow

Closes-Bug: #1450520
Closes-Bug: #1450524
Closes-Bug: #1450621

Change-Id: Ibc347dec4bebe10e0c7603bfa6b09833c84f181f
This commit is contained in:
Brandon Logan 2015-05-01 13:43:33 -05:00
parent 00d37ef490
commit 0837da18a7
20 changed files with 416 additions and 306 deletions

View File

@ -174,6 +174,7 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
raise exc.TimeOutException()
else:
return
raise exc.UnavailableException()
def _process_tls_certificates(self, listener):
"""Processes TLS data from the listener.

View File

@ -110,9 +110,9 @@ haproxy_amphora_opts = [
cfg.IntOpt('connection_max_retries',
default=10,
help=_('Retry threshold for connecting to amphorae.')),
cfg.IntOpt('connection_retry_interval',
default=5,
help=_('Retry timeout between attempts in seconds.'))
cfg.FloatOpt('connection_retry_interval',
default=1,
help=_('Retry timeout between attempts.'))
]
controller_worker_opts = [

View File

@ -79,9 +79,14 @@ SUPPORTED_AMPHORA_TYPES = (AMPHORA_VM,)
# Task/Flow constants
AMPHORA = 'amphora'
AMPHORA_ID = 'amphora_id'
DELTA = 'delta'
LISTENER = 'listener'
LOADBALANCER = 'loadbalancer'
LOADBALANCER_ID = 'loadbalancer_id'
COMPUTE_ID = 'compute_id'
COMPUTE_OBJ = 'compute_obj'
AMPS_DATA = 'amps_data'
NICS = 'nics'
VIP = 'vip'
@ -105,6 +110,10 @@ UPDATE_LOADBALANCER_FLOW = 'octavia-update-loadbalancer-flow'
UPDATE_MEMBER_FLOW = 'octavia-update-member-flow'
UPDATE_POOL_FLOW = 'octavia-update-pool-flow'
# Task Names
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'
RELOAD_LB_AFTER_PLUG_VIP = 'reload-lb-after-plug-vip'
NOVA_1 = '1.1'
NOVA_2 = '2'
NOVA_3 = '3'

View File

@ -153,6 +153,10 @@ class IDAlreadyExists(OctaviaException):
code = 409
class NoReadyAmphoraeException(OctaviaException):
message = _LE('There are not any READY amphora available.')
class NoSuitableAmphoraException(OctaviaException):
message = _LE('Unable to allocate an amphora due to: %(msg)s')

View File

@ -132,10 +132,11 @@ class VirtualMachineManager(compute_base.ComputeBase):
# Extract information from nova response to populate desired amphora
# fields
lb_network_ip = nova_response.addresses[
self._nova_client.networks.get(
CONF.controller_worker.amp_network).label
][0]['addr']
net_name = self._nova_client.networks.get(
CONF.controller_worker.amp_network).label
lb_network_ip = None
if net_name in nova_response.addresses:
lb_network_ip = nova_response.addresses[net_name][0]['addr']
response = models.Amphora(
compute_id=nova_response.id,

View File

@ -16,6 +16,7 @@
import logging
from octavia.common import base_taskflow
from octavia.common import constants
from octavia.common import exceptions
from octavia.controller.worker.flows import amphora_flows
from octavia.controller.worker.flows import health_monitor_flows
@ -246,33 +247,24 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# a conditional flow that would make this cleaner once implemented.
# https://review.openstack.org/#/c/98946/
lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer_id)
create_lb_tf = self._taskflow_load(self._lb_flows.
get_create_load_balancer_flow(),
store={'loadbalancer': lb})
store = {constants.LOADBALANCER_ID: load_balancer_id}
create_lb_tf = self._taskflow_load(
self._lb_flows.get_create_load_balancer_flow(), store=store)
with tf_logging.DynamicLoggingListener(create_lb_tf,
log=LOG):
amp = None
try:
create_lb_tf.run()
amp = create_lb_tf.storage.fetch('amphora')
except Exception:
pass
except exceptions.NoReadyAmphoraeException:
create_amp_lb_tf = self._taskflow_load(
self._amphora_flows.get_create_amphora_for_lb_flow(),
store=store)
if amp is None:
create_amp_lb_tf = self._taskflow_load(
self._amphora_flows.get_create_amphora_for_lb_flow(),
store={'loadbalancer': lb})
with tf_logging.DynamicLoggingListener(create_amp_lb_tf,
log=LOG):
try:
create_amp_lb_tf.run()
except exceptions.ComputeBuildException as e:
raise exceptions.NoSuitableAmphoraException(msg=e.msg)
with tf_logging.DynamicLoggingListener(create_amp_lb_tf,
log=LOG):
try:
create_amp_lb_tf.run()
except exceptions.ComputeBuildException as e:
raise exceptions.NoSuitableAmphoraException(msg=e.msg)
def delete_load_balancer(self, load_balancer_id):
"""Deletes a load balancer by de-allocating Amphorae.

View File

@ -44,17 +44,26 @@ class AmphoraFlows(object):
"""
create_amphora_flow = linear_flow.Flow(constants.CREATE_AMPHORA_FLOW)
create_amphora_flow.add(database_tasks.CreateAmphoraInDB(
provides='amphora'))
create_amphora_flow.add(compute_tasks.ComputeCreate())
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB())
provides=constants.AMPHORA_ID))
create_amphora_flow.add(compute_tasks.ComputeCreate(
requires=constants.AMPHORA_ID,
provides=constants.COMPUTE_ID))
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
wait_flow = linear_flow.Flow('wait_for_amphora',
retry=retry.Times(CONF.
controller_worker.
amp_active_retries))
wait_flow.add(compute_tasks.ComputeWait())
wait_flow.add(compute_tasks.ComputeWait(
requires=constants.COMPUTE_ID))
create_amphora_flow.add(wait_flow)
create_amphora_flow.add(amphora_driver_tasks.AmphoraFinalize())
create_amphora_flow.add(database_tasks.MarkAmphoraReadyInDB())
create_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
create_amphora_flow.add(amphora_driver_tasks.AmphoraFinalize(
requires=constants.AMPHORA))
create_amphora_flow.add(database_tasks.MarkAmphoraReadyInDB(
requires=constants.AMPHORA))
return create_amphora_flow
@ -70,30 +79,43 @@ class AmphoraFlows(object):
create_amp_for_lb_flow = linear_flow.Flow(constants.
CREATE_AMPHORA_FOR_LB_FLOW)
create_amp_for_lb_flow.add(database_tasks.CreateAmphoraInDB(
provides='amphora'))
create_amp_for_lb_flow.add(compute_tasks.ComputeCreate())
create_amp_for_lb_flow.add(database_tasks.MarkAmphoraBootingInDB())
provides=constants.AMPHORA_ID))
create_amp_for_lb_flow.add(compute_tasks.ComputeCreate(
requires=constants.AMPHORA_ID,
provides=constants.COMPUTE_ID))
create_amp_for_lb_flow.add(database_tasks.UpdateAmphoraComputeId(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amp_for_lb_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
wait_flow = linear_flow.Flow('wait_for_amphora',
retry=retry.Times(CONF.
controller_worker.
amp_active_retries))
wait_flow.add(compute_tasks.ComputeWait())
wait_flow.add(compute_tasks.ComputeWait(
requires=constants.COMPUTE_ID,
provides=constants.COMPUTE_OBJ))
wait_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amp_for_lb_flow.add(wait_flow)
create_amp_for_lb_flow.add(amphora_driver_tasks.
AmphoraFinalize())
create_amp_for_lb_flow.add(database_tasks.
MarkAmphoraAllocatedInDB(
requires='loadbalancer'))
create_amp_for_lb_flow.add(database_tasks.GetAmphoraByID(
requires='amphora',
provides='updated_amphora'))
create_amp_for_lb_flow.add(database_tasks.GetLoadbalancerByID(
requires='loadbalancer',
provides='updated_loadbalancer'))
create_amp_for_lb_flow.add(amphora_driver_tasks.AmphoraFinalize(
requires=constants.AMPHORA))
create_amp_for_lb_flow.add(
database_tasks.MarkAmphoraAllocatedInDB(
requires=(constants.AMPHORA, constants.LOADBALANCER_ID),
provides=constants.LOADBALANCER))
create_amp_for_lb_flow.add(
database_tasks.ReloadAmphora(requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
create_amp_for_lb_flow.add(
database_tasks.ReloadLoadBalancer(
name=constants.RELOAD_LB_AFTER_AMP_ASSOC,
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
new_LB_net_subflow = self._lb_flows.get_new_LB_networking_subflow()
create_amp_for_lb_flow.add(new_LB_net_subflow)
create_amp_for_lb_flow.add(database_tasks.MarkLBActiveInDB(
requires='loadbalancer'))
requires=constants.LOADBALANCER))
return create_amp_for_lb_flow

View File

@ -40,19 +40,21 @@ class LoadBalancerFlows(object):
# https://review.openstack.org/#/c/98946/
create_LB_flow = linear_flow.Flow(constants.CREATE_LOADBALANCER_FLOW)
create_LB_flow.add(database_tasks.MapLoadbalancerToAmphora(
requires='loadbalancer',
provides='amphora'))
create_LB_flow.add(database_tasks.GetAmphoraByID(
requires='amphora',
provides='updated_amphora'))
create_LB_flow.add(database_tasks.GetLoadbalancerByID(
requires='loadbalancer',
provides='updated_loadbalancer'))
requires=constants.LOADBALANCER_ID,
provides=constants.AMPHORA_ID))
create_LB_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
create_LB_flow.add(database_tasks.ReloadLoadBalancer(
name=constants.RELOAD_LB_AFTER_AMP_ASSOC,
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
new_LB_net_subflow = self.get_new_LB_networking_subflow()
create_LB_flow.add(new_LB_net_subflow)
create_LB_flow.add(database_tasks.MarkLBActiveInDB(
requires='loadbalancer'))
requires=constants.LOADBALANCER))
return create_LB_flow
@ -78,22 +80,23 @@ class LoadBalancerFlows(object):
new_LB_net_subflow = linear_flow.Flow(constants.
LOADBALANCER_NETWORKING_SUBFLOW)
new_LB_net_subflow.add(network_tasks.GetPlumbedNetworks(
rebind={'amphora': 'updated_amphora'},
provides='nics'))
new_LB_net_subflow.add(network_tasks.CalculateDelta(
rebind={'amphora': 'updated_amphora'},
requires='nics',
provides='delta'))
new_LB_net_subflow.add(network_tasks.PlugNetworks(
rebind={'amphora': 'updated_amphora'},
requires='delta'))
new_LB_net_subflow.add(amphora_driver_tasks.AmphoraPostNetworkPlug(
rebind={'amphora': 'updated_amphora'}))
new_LB_net_subflow.add(network_tasks.AllocateVIP(
requires=constants.LOADBALANCER,
provides=constants.VIP))
new_LB_net_subflow.add(database_tasks.UpdateVIPAfterAllocation(
requires=(constants.LOADBALANCER_ID, constants.VIP),
provides=constants.LOADBALANCER))
new_LB_net_subflow.add(network_tasks.PlugVIP(
rebind={'amphora': 'updated_amphora'}))
requires=constants.LOADBALANCER,
provides=constants.AMPS_DATA))
new_LB_net_subflow.add(database_tasks.UpdateAmphoraVIPData(
requires=constants.AMPS_DATA))
new_LB_net_subflow.add(database_tasks.ReloadLoadBalancer(
name=constants.RELOAD_LB_AFTER_PLUG_VIP,
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
new_LB_net_subflow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
rebind={'loadbalancer': 'updated_loadbalancer'}))
requires=constants.LOADBALANCER))
return new_LB_net_subflow

View File

@ -18,6 +18,7 @@ import logging
from oslo.config import cfg
from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
from octavia.common import constants
from octavia.db import api as db_apis
@ -110,7 +111,6 @@ class ListenerDelete(BaseAmphoraTask):
LOG.warn(_LW("Reverting listener delete."))
self.listener_repo.update(db_apis.get_session(), id=listener.id,
provisioning_status=constants.ERROR)
return None
class AmphoraGetInfo(BaseAmphoraTask):
@ -137,13 +137,13 @@ class AmphoraFinalize(BaseAmphoraTask):
self.amphora_driver.finalize_amphora(amphora)
LOG.debug("Finalized the amphora.")
def revert(self, amphora, *args, **kwargs):
def revert(self, result, amphora, *args, **kwargs):
"""Handle a failed amphora finalize."""
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Reverting amphora finalize."))
self.amphora_repo.update(db_apis.get_session(), id=amphora.id,
provisioning_status=constants.ERROR)
return None
status=constants.ERROR)
class AmphoraPostNetworkPlug(BaseAmphoraTask):
@ -154,12 +154,13 @@ class AmphoraPostNetworkPlug(BaseAmphoraTask):
self.amphora_driver.post_network_plug(amphora)
LOG.debug("Posted network plug for the compute instance")
def revert(self, amphora, *args, **kwargs):
def revert(self, result, amphora, *args, **kwargs):
"""Handle a failed post network plug."""
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Reverting post network plug."))
self.amphora_repo.update(db_apis.get_session(), id=amphora.id,
provisioning_status=constants.ERROR)
return None
status=constants.ERROR)
class AmphoraPostVIPPlug(BaseAmphoraTask):
@ -170,10 +171,11 @@ class AmphoraPostVIPPlug(BaseAmphoraTask):
self.amphora_driver.post_vip_plug(loadbalancer)
LOG.debug("Notfied amphora of vip plug")
def revert(self, loadbalancer, *args, **kwargs):
def revert(self, result, loadbalancer, *args, **kwargs):
"""Handle a failed amphora vip plug notification."""
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Reverting post vip plug."))
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer.id,
provisioning_status=constants.ERROR)
return None
status=constants.ERROR)

View File

@ -19,6 +19,7 @@ import time
from oslo.config import cfg
from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
from octavia.common import constants
from octavia.common import exceptions
@ -45,66 +46,62 @@ class BaseComputeTask(task.Task):
class ComputeCreate(BaseComputeTask):
"""Create the compute instance for a new amphora."""
def execute(self, amphora):
def execute(self, amphora_id):
"""Create an amphora
:returns: an amphora
"""
LOG.debug("Nova Create execute for amphora with id %s" % amphora.id)
LOG.debug("Nova Create execute for amphora with id %s" % amphora_id)
try:
# todo(german): add security groups
compute_id = self.compute.build(
name="amphora-" + amphora.id,
name="amphora-" + amphora_id,
amphora_flavor=CONF.controller_worker.amp_flavor_id,
image_id=CONF.controller_worker.amp_image_id,
key_name=CONF.controller_worker.amp_ssh_key_name,
sec_groups=None,
network_ids=CONF.controller_worker.amp_network)
sec_groups=CONF.controller_worker.amp_secgroup_list,
network_ids=[CONF.controller_worker.amp_network])
LOG.debug("Server created with id: %s for amphora id: %s" %
(compute_id, amphora.id))
amphora.compute_id = compute_id
return amphora
(compute_id, amphora_id))
return compute_id
except Exception as e:
LOG.error(_LE("Nova create for amphora id: %(amp)s "
"failed: %(exp)s"),
{'amp': amphora.id, 'exp': e})
{'amp': amphora_id, 'exp': e})
raise e
def revert(self, amphora, *args, **kwargs):
def revert(self, result, amphora_id, *args, **kwargs):
"""This method will revert the creation of the
amphora. So it will just delete it in this flow
"""
if isinstance(result, failure.Failure):
return
compute_id = result
LOG.warn(_LW("Reverting Nova create for amphora with id"
"%(amp)s and compute id: %(comp)s"),
{'amp': amphora.id, 'comp': amphora.compute_id})
{'comp': compute_id})
try:
self.compute.delete(amphora.compute_id)
amphora.compute_id = None
self.compute.delete(compute_id)
except Exception as e:
LOG.error(_LE("Reverting Nova create failed"
" with exception %s"), e)
return
class ComputeWait(BaseComputeTask):
"""Wait for the compute driver to mark the amphora active."""
def execute(self, amphora):
def execute(self, compute_id):
"""Wait for the compute driver to mark the amphora active
:raises: Generic exception if the amphora is not active
:returns: An amphora object
"""
time.sleep(CONF.controller_worker.amp_active_wait_sec)
amp = self.compute.get_amphora(amphora.compute_id)
amp = self.compute.get_amphora(compute_id)
if amp.status == constants.ACTIVE:
amphora.lb_network_ip = amp.lb_network_ip
return amphora
return amp
raise exceptions.ComputeWaitTimeoutException()

View File

@ -17,8 +17,10 @@ import logging
from oslo_utils import uuidutils
from taskflow import task
from taskflow.types import failure
from octavia.common import constants
from octavia.common import exceptions
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.i18n import _LW
@ -30,6 +32,7 @@ class BaseDatabaseTask(task.Task):
"""Base task to load drivers common to the tasks."""
def __init__(self, **kwargs):
self.repos = repo.Repositories()
self.amphora_repo = repo.AmphoraRepository()
self.health_mon_repo = repo.HealthMonitorRepository()
self.listener_repo = repo.ListenerRepository()
@ -42,8 +45,6 @@ class BaseDatabaseTask(task.Task):
class CreateAmphoraInDB(BaseDatabaseTask):
"""Task to create an initial amphora in the Database."""
default_provides = constants.AMPHORA
def execute(self, *args, **kwargs):
"""Creates an pending create amphora record in the database.
@ -55,26 +56,28 @@ class CreateAmphoraInDB(BaseDatabaseTask):
status=constants.PENDING_CREATE)
LOG.debug("Created Amphora in DB with id %s" % amphora.id)
return amphora
return amphora.id
def revert(self, *args, **kwargs):
def revert(self, result, *args, **kwargs):
"""Revert by storing the amphora in error state in the DB
In a future version we might change the status to DELETED
if deleting the amphora was successful
"""
if 'result' not in kwargs:
return None # nothing to do
if isinstance(result, failure.Failure):
# This task's execute failed, so nothing needed to be done to
# revert
return
# amphora = kwargs['result']
# TODO(johnsom) fix
# LOG.warn(_LW("Reverting create amphora in DB for amp id %s "),
# amphora.id)
# At this point the revert is being called because another task
# executed after this failed so we will need to do something and
# result is the amphora's id
# _amphora_repo.update(db_apis.get_session(), amphora.id,
# status=constants.ERROR,
# compute_id=amphora.compute_id)
LOG.warn(_LW("Reverting create amphora in DB for amp id %s "), result)
# Delete the amphora for now. May want to just update status later
self.amphora_repo.delete(db_apis.get_session(), id=result)
class DeleteHealthMonitorInDB(BaseDatabaseTask):
@ -167,10 +170,10 @@ class DeletePoolInDB(BaseDatabaseTask):
# operating_status=constants.ERROR)
class GetAmphoraByID(BaseDatabaseTask):
class ReloadAmphora(BaseDatabaseTask):
"""Get an amphora object from the database."""
def execute(self, amphora):
def execute(self, amphora_id):
"""Get an amphora object from the database.
:param amphora_id: The amphora ID to lookup
@ -178,14 +181,14 @@ class GetAmphoraByID(BaseDatabaseTask):
"""
LOG.debug("Get amphora from DB for amphora id: %s " %
amphora.id)
return self.amphora_repo.get(db_apis.get_session(), id=amphora.id)
amphora_id)
return self.amphora_repo.get(db_apis.get_session(), id=amphora_id)
class GetLoadbalancerByID(BaseDatabaseTask):
class ReloadLoadBalancer(BaseDatabaseTask):
"""Get an load balancer object from the database."""
def execute(self, loadbalancer):
def execute(self, loadbalancer_id, *args, **kwargs):
"""Get an load balancer object from the database.
:param loadbalancer_id: The load balancer ID to lookup
@ -193,15 +196,34 @@ class GetLoadbalancerByID(BaseDatabaseTask):
"""
LOG.debug("Get load balancer from DB for load balancer id: %s " %
loadbalancer.id)
loadbalancer_id)
return self.loadbalancer_repo.get(db_apis.get_session(),
id=loadbalancer.id)
id=loadbalancer_id)
class UpdateVIPAfterAllocation(BaseDatabaseTask):
def execute(self, loadbalancer_id, vip):
self.repos.vip.update(db_apis.get_session(), loadbalancer_id,
port_id=vip.port_id, network_id=vip.network_id,
ip_address=vip.ip_address)
return self.repos.load_balancer.get(db_apis.get_session(),
id=loadbalancer_id)
class UpdateAmphoraVIPData(BaseDatabaseTask):
def execute(self, amps_data):
for amp_data in amps_data:
self.repos.amphora.update(db_apis.get_session(), amp_data.id,
vrrp_ip=amp_data.vrrp_ip,
ha_ip=amp_data.ha_ip)
class MapLoadbalancerToAmphora(BaseDatabaseTask):
"""Maps and assigns a load balancer to an amphora in the database."""
def execute(self, loadbalancer):
def execute(self, loadbalancer_id):
"""Allocates an Amphora for the load balancer in the database.
:param lb_id: The load balancer id to map to an amphora
@ -210,18 +232,20 @@ class MapLoadbalancerToAmphora(BaseDatabaseTask):
"""
LOG.debug("Allocating an Amphora for load balancer with id %s" %
loadbalancer.id)
loadbalancer_id)
amp = self.amphora_repo.allocate_and_associate(
db_apis.get_session(),
loadbalancer.id)
loadbalancer_id)
if amp is None:
LOG.debug("No Amphora available for load balancer with id %s" %
loadbalancer.id)
else:
LOG.debug("Allocated Amphora with id %s for load balancer "
"with id %s" % (amp.id, loadbalancer.id))
return amp
loadbalancer_id)
raise exceptions.NoReadyAmphoraeException()
LOG.debug("Allocated Amphora with id %s for load balancer "
"with id %s" % (amp.id, loadbalancer_id))
return amp.id
class MarkAmphoraAllocatedInDB(BaseDatabaseTask):
@ -231,21 +255,24 @@ class MarkAmphoraAllocatedInDB(BaseDatabaseTask):
retried sufficiently - so just abort
"""
def execute(self, amphora, loadbalancer):
def execute(self, amphora, loadbalancer_id):
"""Mark amphora as allocated to a load balancer in DB."""
LOG.debug("Mark ALLOCATED in DB for amphora: %s with compute id %s "
"for load balancer: %s" %
(amphora.id, amphora.compute_id, loadbalancer.id))
(amphora.id, amphora.compute_id, loadbalancer_id))
self.amphora_repo.update(db_apis.get_session(), amphora.id,
status=constants.AMPHORA_ALLOCATED,
compute_id=amphora.compute_id,
lb_network_ip=amphora.lb_network_ip,
load_balancer_id=loadbalancer.id)
load_balancer_id=loadbalancer_id)
def revert(self, amphora, *args, **kwargs):
def revert(self, result, amphora, loadbalancer_id, *args, **kwargs):
"""Mark the amphora as broken and ready to be cleaned up."""
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Reverting mark amphora ready in DB for amp "
"id %(amp)s and compute id %(comp)s"),
{'amp': amphora.id, 'comp': amphora.compute_id})
@ -256,24 +283,27 @@ class MarkAmphoraAllocatedInDB(BaseDatabaseTask):
class MarkAmphoraBootingInDB(BaseDatabaseTask):
"""Mark the amphora as booting in the database."""
def execute(self, amphora):
def execute(self, amphora_id, compute_id):
"""Mark amphora booting in DB."""
LOG.debug("Mark BOOTING in DB for amphora: %s with compute id %s" %
(amphora.id, amphora.compute_id))
self.amphora_repo.update(db_apis.get_session(), amphora.id,
(amphora_id, compute_id))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.AMPHORA_BOOTING,
compute_id=amphora.compute_id)
compute_id=compute_id)
def revert(self, amphora, *args, **kwargs):
def revert(self, result, amphora_id, compute_id, *args, **kwargs):
"""Mark the amphora as broken and ready to be cleaned up."""
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Reverting mark amphora booting in DB for amp "
"id %(amp)s and compute id %(comp)s"),
{'amp': amphora.id, 'comp': amphora.compute_id})
self.amphora_repo.update(db_apis.get_session(), amphora.id,
{'amp': amphora_id, 'comp': compute_id})
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR,
compute_id=amphora.compute_id)
compute_id=compute_id)
class MarkAmphoraDeletedInDB(BaseDatabaseTask):
@ -380,6 +410,22 @@ class MarkAmphoraReadyInDB(BaseDatabaseTask):
lb_network_ip=amphora.lb_network_ip)
class UpdateAmphoraComputeId(BaseDatabaseTask):
def execute(self, amphora_id, compute_id):
self.amphora_repo.update(db_apis.get_session(), amphora_id,
compute_id=compute_id)
return self.amphora_repo.get(db_apis.get_session(), id=amphora_id)
class UpdateAmphoraInfo(BaseDatabaseTask):
def execute(self, amphora_id, compute_obj):
self.amphora_repo.update(db_apis.get_session(), amphora_id,
lb_network_ip=compute_obj.lb_network_ip)
return self.amphora_repo.get(db_apis.get_session(), id=amphora_id)
class MarkLBActiveInDB(BaseDatabaseTask):
"""Mark the load balancer active in the DB.

View File

@ -18,6 +18,7 @@ import logging
from oslo.config import cfg
from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
from octavia.common import constants
from octavia.i18n import _LW, _LE
@ -179,48 +180,51 @@ class UnPlugNetworks(BaseNetworkTask):
class PlugVIP(BaseNetworkTask):
"""Task to plumb a VIP."""
def execute(self, amphora):
def execute(self, loadbalancer):
"""Plumb a vip to an amphora."""
# Likely needs to be a subflow!
LOG.debug("Plumbing VIP for loadbalancer id: %s" % loadbalancer.id)
LOG.debug("Plumbing VIP for amphora id: %s" % amphora.id)
amps_data = self.network_driver.plug_vip(loadbalancer,
loadbalancer.vip)
return amps_data
vip = self.network_driver.plug_vip(amphora.load_balancer,
amphora.load_balancer.vip)
amphora.load_balancer.vip = vip
return
def revert(self, amphora):
def revert(self, result, loadbalancer, *args, **kwargs):
"""Handle a failure to plumb a vip."""
LOG.warn(_LW("Unable to plug VIP for amp id %s"), amphora.id)
if isinstance(result, failure.Failure):
return
LOG.warn(_LW("Unable to plug VIP for loadbalancer id %s"),
loadbalancer.id)
self.network_driver.unplug_vip(amphora.load_balancer,
amphora.load_balancer.vip)
return
self.network_driver.unplug_vip(loadbalancer, loadbalancer.vip)
class AllocateVIP(BaseNetworkTask):
"""Task to allocate a VIP."""
def execute(self, port_id, network_id, ip_address):
def execute(self, loadbalancer):
"""Allocate a vip to the loadbalancer."""
LOG.debug("Allocate_vip port_id %s, network_id %s,"
"ip_address %s",
port_id, network_id, ip_address)
return self.network_driver.allocate_vip(port_id,
network_id, ip_address)
loadbalancer.vip.port_id,
loadbalancer.vip.network_id,
loadbalancer.vip.ip_address)
return self.network_driver.allocate_vip(
port_id=loadbalancer.vip.port_id,
network_id=loadbalancer.vip.network_id,
ip_address=loadbalancer.vip.ip_address)
def revert(self, vip):
def revert(self, result, loadbalancer, *args, **kwargs):
"""Handle a failure to allocate vip."""
LOG.warn(_LW("Unable to allocate VIP %s"), vip.ip_address)
if isinstance(result, failure.Failure):
LOG.exception(_LE("Unable to allocate VIP"))
return
vip = result
LOG.warn(_LW("Deallocating vip %s"), vip.ip_address)
self.network_driver.deallocate_vip(vip)
return
class DeallocateVIP(BaseNetworkTask):

View File

@ -23,3 +23,6 @@ class TestConfig(base.TestCase):
def test_sanity(self):
config.init([])
config.setup_logging(cfg.CONF)
# Resetting because this will cause inconsistent errors when run with
# other tests
self.addCleanup(cfg.CONF.reset)

View File

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from taskflow.patterns import linear_flow as flow
from octavia.common import constants
from octavia.controller.worker.flows import amphora_flows
import octavia.tests.unit.base as base
@ -38,9 +39,11 @@ class TestAmphoraFlows(base.TestCase):
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn('amphora', amp_flow.provides)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertEqual(len(amp_flow.provides), 1)
self.assertEqual(len(amp_flow.provides), 3)
self.assertEqual(len(amp_flow.requires), 0)
def test_get_create_amphora_for_lb_flow(self):
@ -49,10 +52,16 @@ class TestAmphoraFlows(base.TestCase):
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn('amphora', amp_flow.provides)
self.assertIn('loadbalancer', amp_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertIn(constants.VIP, amp_flow.provides)
self.assertIn(constants.AMPS_DATA, amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertEqual(len(amp_flow.provides), 5)
self.assertEqual(len(amp_flow.provides), 7)
self.assertEqual(len(amp_flow.requires), 1)
def test_get_delete_amphora_flow(self):
@ -61,7 +70,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn('amphora', amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertEqual(len(amp_flow.provides), 0)
self.assertEqual(len(amp_flow.requires), 1)

View File

@ -15,6 +15,7 @@
from taskflow.patterns import linear_flow as flow
from octavia.common import constants
from octavia.controller.worker.flows import load_balancer_flows
import octavia.tests.unit.base as base
@ -32,11 +33,13 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIsInstance(lb_flow, flow.Flow)
self.assertIn('amphora', lb_flow.provides)
self.assertIn('delta', lb_flow.provides)
self.assertIn('nics', lb_flow.provides)
self.assertIn(constants.AMPHORA, lb_flow.provides)
self.assertIn(constants.AMPHORA_ID, lb_flow.provides)
self.assertIn(constants.VIP, lb_flow.provides)
self.assertIn(constants.AMPS_DATA, lb_flow.provides)
self.assertIn(constants.LOADBALANCER, lb_flow.provides)
self.assertIn('loadbalancer', lb_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, lb_flow.requires)
self.assertEqual(len(lb_flow.provides), 5)
self.assertEqual(len(lb_flow.requires), 1)
@ -58,10 +61,14 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIsInstance(lb_flow, flow.Flow)
self.assertIn('updated_loadbalancer', lb_flow.requires)
self.assertIn('updated_amphora', lb_flow.requires)
self.assertIn(constants.VIP, lb_flow.provides)
self.assertIn(constants.AMPS_DATA, lb_flow.provides)
self.assertIn(constants.LOADBALANCER, lb_flow.provides)
self.assertEqual(len(lb_flow.provides), 2)
self.assertIn(constants.LOADBALANCER, lb_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, lb_flow.requires)
self.assertEqual(len(lb_flow.provides), 3)
self.assertEqual(len(lb_flow.requires), 2)
def test_get_update_load_balancer_flow(self):

View File

@ -179,11 +179,11 @@ class TestDatabaseTasks(base.TestCase):
_amphora_mock)
# Test revert
amp = amphora_finalize_obj.revert(_amphora_mock)
amp = amphora_finalize_obj.revert(None, _amphora_mock)
repo.AmphoraRepository.update.assert_called_once_with(
'TEST',
id=AMP_ID,
provisioning_status=constants.ERROR)
status=constants.ERROR)
self.assertIsNone(amp)
def test_amphora_post_network_plug(self,
@ -202,11 +202,11 @@ class TestDatabaseTasks(base.TestCase):
assert_called_once_with)(_amphora_mock)
# Test revert
amp = amphora_post_network_plug_obj.revert(_amphora_mock)
amp = amphora_post_network_plug_obj.revert(None, _amphora_mock)
repo.AmphoraRepository.update.assert_called_once_with(
'TEST',
id=AMP_ID,
provisioning_status=constants.ERROR)
status=constants.ERROR)
self.assertIsNone(amp)
@ -228,10 +228,10 @@ class TestDatabaseTasks(base.TestCase):
assert_called_once_with)(_LB_mock)
# Test revert
amp = amphora_post_vip_plug_obj.revert(_LB_mock)
amp = amphora_post_vip_plug_obj.revert(None, _LB_mock)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
status=constants.ERROR)
self.assertIsNone(amp)

View File

@ -29,7 +29,7 @@ AMP_FLAVOR_ID = 10
AMP_IMAGE_ID = 11
AMP_SSH_KEY_NAME = None
AMP_NET = uuidutils.generate_uuid()
AMP_SEC_GROUPS = None
AMP_SEC_GROUPS = []
AMP_WAIT = 12
AMPHORA_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
@ -68,43 +68,39 @@ class TestComputeTasks(base.TestCase):
super(TestComputeTasks, self).setUp()
@mock.patch('stevedore.driver.DriverManager.driver')
def test_compute_create(self,
mock_driver):
def test_compute_create(self, mock_driver):
mock_driver.build.side_effect = [COMPUTE_ID, TestException('test')]
# Test execute()
createcompute = compute_tasks.ComputeCreate()
amphora = createcompute.execute(_amphora_mock)
compute_id = createcompute.execute(_amphora_mock.id)
# Validate that the build method was called properly
mock_driver.build.assert_called_once_with(
name="amphora-" + AMPHORA_ID,
name="amphora-" + _amphora_mock.id,
amphora_flavor=AMP_FLAVOR_ID,
image_id=AMP_IMAGE_ID,
key_name=AMP_SSH_KEY_NAME,
sec_groups=AMP_SEC_GROUPS,
network_ids=AMP_NET)
network_ids=[AMP_NET])
# Make sure it returns the expected compute_id
assert(amphora.compute_id == COMPUTE_ID)
assert(compute_id == COMPUTE_ID)
# Test that a build exception is raised
createcompute = compute_tasks.ComputeCreate()
self.assertRaises(TestException,
createcompute.execute,
amphora=_amphora_mock)
_amphora_mock)
# Test revert()
_amphora_mock.compute_id = COMPUTE_ID
createcompute = compute_tasks.ComputeCreate()
createcompute.revert(_amphora_mock)
# Validate that the compute_id is cleared
self.assertIsNone(_amphora_mock.compute_id)
createcompute.revert(compute_id, _amphora_mock.id)
# Validate that the delete method was called properly
mock_driver.delete.assert_called_once_with(
@ -112,7 +108,7 @@ class TestComputeTasks(base.TestCase):
# Test that a delete exception is not raised
createcompute.revert(_amphora_mock)
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
@mock.patch('stevedore.driver.DriverManager.driver')
@mock.patch('time.sleep')
@ -127,14 +123,12 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _amphora_mock
computewait = compute_tasks.ComputeWait()
amphora = computewait.execute(_amphora_mock)
computewait.execute(COMPUTE_ID)
time.sleep.assert_called_once_with(AMP_WAIT)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID)
assert(amphora.lb_network_ip == LB_NET_IP)
_amphora_mock.status = constants.DELETED
self.assertRaises(exceptions.ComputeWaitTimeoutException,

View File

@ -15,8 +15,10 @@
import mock
from oslo_utils import uuidutils
from taskflow.types import failure
from octavia.common import constants
from octavia.common import exceptions
from octavia.controller.worker.tasks import database_tasks
from octavia.db import repositories as repo
import octavia.tests.unit.base as base
@ -36,8 +38,10 @@ _amphora_mock.compute_id = COMPUTE_ID
_amphora_mock.lb_network_ip = LB_NET_IP
_loadbalancer_mock = mock.MagicMock()
_loadbalancer_mock.id = LB_ID
_tf_failure_mock = mock.Mock(spec=failure.Failure)
@mock.patch('octavia.db.repositories.AmphoraRepository.delete')
@mock.patch('octavia.db.repositories.AmphoraRepository.update')
@mock.patch('octavia.db.repositories.ListenerRepository.update')
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
@ -74,22 +78,23 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
create_amp_in_db = database_tasks.CreateAmphoraInDB()
amp = create_amp_in_db.execute()
amp_id = create_amp_in_db.execute()
repo.AmphoraRepository.create.assert_called_once_with(
'TEST',
id=AMP_ID,
status=constants.PENDING_CREATE)
assert(amp == _amphora_mock)
assert(amp_id == _amphora_mock.id)
# Test the revert
# TODO(johnsom) finish when this method is updated
amp = create_amp_in_db.revert()
amp = create_amp_in_db.revert(_tf_failure_mock)
self.assertIsNone(amp)
@ -111,7 +116,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
delete_health_mon = database_tasks.DeleteHealthMonitorInDB()
delete_health_mon.execute(HM_ID)
@ -139,7 +145,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
delete_member = database_tasks.DeleteMemberInDB()
delete_member.execute(MEMBER_ID)
@ -166,7 +173,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
delete_pool = database_tasks.DeletePoolInDB()
delete_pool.execute(POOL_ID)
@ -188,17 +196,18 @@ class TestDatabaseTasks(base.TestCase):
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
return_value=_amphora_mock)
def test_get_amphora_by_id(self,
mock_amp_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
def test_reload_amphora(self,
mock_amp_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
get_amp_by_id = database_tasks.GetAmphoraByID()
amp = get_amp_by_id.execute(_amphora_mock)
reload_amp = database_tasks.ReloadAmphora()
amp = reload_amp.execute(AMP_ID)
repo.AmphoraRepository.get.assert_called_once_with(
'TEST',
@ -208,17 +217,18 @@ class TestDatabaseTasks(base.TestCase):
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get',
return_value=_loadbalancer_mock)
def test_get_loadbalancer_by_id(self,
mock_lb_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
def test_reload_load_balancer(self,
mock_lb_get,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
get_lb_by_id = database_tasks.GetLoadbalancerByID()
lb = get_lb_by_id.execute(_loadbalancer_mock)
reload_lb = database_tasks.ReloadLoadBalancer()
lb = reload_lb.execute(LB_ID)
repo.LoadBalancerRepository.get.assert_called_once_with(
'TEST',
@ -236,20 +246,20 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
map_lb_to_amp = database_tasks.MapLoadbalancerToAmphora()
amp = map_lb_to_amp.execute(self.loadbalancer_mock)
amp_id = map_lb_to_amp.execute(self.loadbalancer_mock.id)
repo.AmphoraRepository.allocate_and_associate.assert_called_once_with(
'TEST',
LB_ID)
assert amp == _amphora_mock
assert amp_id == _amphora_mock.id
amp = map_lb_to_amp.execute(self.loadbalancer_mock)
self.assertIsNone(amp)
self.assertRaises(exceptions.NoReadyAmphoraeException,
map_lb_to_amp.execute, self.loadbalancer_mock.id)
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
return_value=_amphora_mock)
@ -263,12 +273,13 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_amp_allocated_in_db = (database_tasks.
MarkAmphoraAllocatedInDB())
mark_amp_allocated_in_db.execute(_amphora_mock,
self.loadbalancer_mock)
self.loadbalancer_mock.id)
repo.AmphoraRepository.update.assert_called_once_with(
'TEST',
@ -281,7 +292,8 @@ class TestDatabaseTasks(base.TestCase):
# Test the revert
mock_amphora_repo_update.reset_mock()
mark_amp_allocated_in_db.revert(_amphora_mock)
mark_amp_allocated_in_db.revert(None, _amphora_mock,
self.loadbalancer_mock.id)
repo.AmphoraRepository.update.assert_called_once_with(
'TEST',
@ -294,10 +306,12 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_amp_booting_in_db = database_tasks.MarkAmphoraBootingInDB()
mark_amp_booting_in_db.execute(_amphora_mock)
mark_amp_booting_in_db.execute(_amphora_mock.id,
_amphora_mock.compute_id)
repo.AmphoraRepository.update.assert_called_once_with(
'TEST',
@ -308,7 +322,8 @@ class TestDatabaseTasks(base.TestCase):
# Test the revert
mock_amphora_repo_update.reset_mock()
mark_amp_booting_in_db.revert(_amphora_mock)
mark_amp_booting_in_db.revert(None, _amphora_mock.id,
_amphora_mock.compute_id)
repo.AmphoraRepository.update.assert_called_once_with(
'TEST',
@ -322,7 +337,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_amp_deleted_in_db = database_tasks.MarkAmphoraDeletedInDB()
mark_amp_deleted_in_db.execute(_amphora_mock)
@ -348,7 +364,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_amp_pending_delete_in_db = (database_tasks.
MarkAmphoraPendingDeleteInDB())
@ -375,7 +392,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_amp_pending_update_in_db = (database_tasks.
MarkAmphoraPendingUpdateInDB())
@ -402,7 +420,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
_amphora_mock.lb_network_ip = LB_NET_IP
@ -434,7 +453,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_listener_active = database_tasks.MarkListenerActiveInDB()
mark_listener_active.execute(self.listener_mock)
@ -460,7 +480,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_listener_deleted = database_tasks.MarkListenerDeletedInDB()
mark_listener_deleted.execute(self.listener_mock)
@ -486,7 +507,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_listener_pending_delete = (database_tasks.
MarkListenerPendingDeleteInDB())
@ -513,7 +535,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_lb_and_listener_active = (database_tasks.
MarkLBAndListenerActiveInDB())
@ -552,7 +575,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_loadbalancer_active = database_tasks.MarkLBActiveInDB()
mark_loadbalancer_active.execute(self.loadbalancer_mock)
@ -578,7 +602,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_loadbalancer_deleted = database_tasks.MarkLBDeletedInDB()
mark_loadbalancer_deleted.execute(self.loadbalancer_mock)
@ -604,7 +629,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_loadbalancer_pending_delete = (database_tasks.
MarkLBPendingDeleteInDB())
@ -633,7 +659,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_health_mon = database_tasks.UpdateHealthMonInDB()
update_health_mon.execute(self.health_mon_mock,
@ -663,7 +690,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_listener = database_tasks.UpdateListenerInDB()
update_listener.execute(self.listener_mock,
@ -693,7 +721,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_member = database_tasks.UpdateMemberInDB()
update_member.execute(self.member_mock,
@ -723,7 +752,8 @@ class TestDatabaseTasks(base.TestCase):
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update):
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_pool = database_tasks.UpdatePoolInDB()
update_pool.execute(self.pool_mock,

View File

@ -18,6 +18,7 @@ from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.common import data_models as o_data_models
from octavia.controller.worker.tasks import network_tasks
from octavia.network import base as net_base
from octavia.network import data_models
@ -29,6 +30,9 @@ COMPUTE_ID = uuidutils.generate_uuid()
PORT_ID = uuidutils.generate_uuid()
NETWORK_ID = uuidutils.generate_uuid()
IP_ADDRESS = "172.24.41.1"
VIP = o_data_models.Vip(port_id=PORT_ID, network_id=NETWORK_ID,
ip_address=IP_ADDRESS)
LB = o_data_models.LoadBalancer(vip=VIP)
class TestException(Exception):
@ -173,35 +177,28 @@ class TestNetworkTasks(base.TestCase):
mock_driver):
net = network_tasks.PlugVIP()
lb_mock = mock.MagicMock()
self.amphora_mock.load_balancer = lb_mock
vip_mock = mock.MagicMock()
lb_mock.vip = vip_mock
mock_driver.plug_vip.return_value = ["vip"]
mock_driver.plug_vip.side_effect = ["vip"]
net.execute(self.amphora_mock)
mock_driver.plug_vip.assert_called_once_with(lb_mock, vip_mock)
self.assertEqual("vip", lb_mock.vip)
data = net.execute(LB)
mock_driver.plug_vip.assert_called_once_with(LB, LB.vip)
self.assertEqual(["vip"], data)
# revert
lb_mock.vip = vip_mock
net.revert(self.amphora_mock)
mock_driver.unplug_vip.assert_called_once_with(lb_mock, vip_mock)
net.revert(["vip"], LB)
mock_driver.unplug_vip.assert_called_once_with(LB, LB.vip)
def test_allocate_vip(self, mock_driver):
net = network_tasks.AllocateVIP()
mock_driver.allocate_vip.side_effect = ["vip"]
mock_driver.allocate_vip.return_value = LB.vip
mock_driver.reset_mock()
self.assertEqual("vip", net.execute(PORT_ID, NETWORK_ID, IP_ADDRESS))
mock_driver.allocate_vip.assert_called_once_with(PORT_ID,
NETWORK_ID,
IP_ADDRESS)
self.assertEqual(LB.vip, net.execute(LB))
mock_driver.allocate_vip.assert_called_once_with(
port_id=PORT_ID, network_id=NETWORK_ID, ip_address=IP_ADDRESS)
# revert
vip_mock = mock.MagicMock()
net.revert(vip_mock)
net.revert(vip_mock, LB)
mock_driver.deallocate_vip.assert_called_once_with(vip_mock)
def test_deallocate_vip(self, mock_driver):

View File

@ -17,6 +17,7 @@ import mock
from oslo_utils import uuidutils
from octavia.common import base_taskflow
from octavia.common import constants
from octavia.common import exceptions
from octavia.controller.worker import controller_worker
import octavia.tests.unit.base as base
@ -325,55 +326,43 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
# Test code path with an existing READY amphora
store = {constants.LOADBALANCER_ID: LB_ID}
cw = controller_worker.ControllerWorker()
cw.create_load_balancer(LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={'loadbalancer':
_load_balancer_mock}))
assert_called_once_with(_flow_mock, store=store))
_flow_mock.run.assert_called_once()
_flow_mock.storage.fetch.assert_called_once_with('amphora')
assert mock_get_create_amp_flow.called is False
fetch_None_mock = mock.MagicMock(side_effect=exceptions.
ComputeBuildException())
_flow_mock.storage.fetch = fetch_None_mock
cw.create_load_balancer(LB_ID)
self.assertFalse(mock_get_create_amp_for_lb_flow.called)
# Test code path with no existing READY amphora
_flow_mock.reset_mock()
mock_get_create_lb_flow.reset_mock()
mock_taskflow_load.reset_mock()
fetch_None_mock = mock.MagicMock(return_value=None)
_flow_mock.storage.fetch = fetch_None_mock
mock_eng = mock.Mock()
mock_taskflow_load.return_value = mock_eng
mock_eng.run.side_effect = [exceptions.NoReadyAmphoraeException, None]
cw.create_load_balancer(LB_ID)
base_taskflow.BaseTaskFlowEngine._taskflow_load.assert_has_calls([
mock.call(_flow_mock,
store={'loadbalancer': _load_balancer_mock}),
mock.call('TEST2',
store={'loadbalancer': _load_balancer_mock}),
], any_order=False)
# mock is showing odd calls, even persisting through a reset
# mock_taskflow_load.assert_has_calls([
# mock.call(_flow_mock, store=store),
# mock.call('TEST2', store=store),
# ], anyorder=False)
_flow_mock.run.assert_any_call()
mock_eng.run.assert_any_call()
assert _flow_mock.run.call_count == 2
self.assertEqual(mock_eng.run.call_count, 2)
_flow_mock.storage.fetch.assert_called_once_with('amphora')
_create_map_flow_mock.run = mock.MagicMock(side_effect=exceptions.
ComputeBuildException)
mock_taskflow_load.side_effect = [_flow_mock, _create_map_flow_mock]
mock_eng.reset()
mock_eng.run = mock.MagicMock(
side_effect=[exceptions.NoReadyAmphoraeException,
exceptions.ComputeBuildException])
self.assertRaises(exceptions.NoSuitableAmphoraException,
cw.create_load_balancer,