Get Me A Load Balancer Controller

Sets up the flows and some new tasks required to create all the
resources needed for an entire load balancer graph.  This includes
updating all listeners on all amphorae (depending on topology), and
plugging networks and setting up the routes and rules on every
amphora for a load balancer.  Luckily this mostly reuses tasks and
flows that were already created, though some new tasks and flows
were created specifically for handling many listeners.

Co-Authored-By: Trevor Vardeman <trevor.vardeman@rackspace.com>

Change-Id: I43a838e80281a37537e179cd8d4768f45e1ca7f1
This commit is contained in:
Trevor Vardeman 2016-02-14 17:52:46 -06:00
parent c7d6146e59
commit 4a1acafa12
13 changed files with 407 additions and 37 deletions

View File

@ -171,7 +171,9 @@ CREATE_LISTENER_FLOW = 'octavia-create-listener_flow'
PRE_CREATE_LOADBALANCER_FLOW = 'octavia-pre-create-loadbalancer-flow' PRE_CREATE_LOADBALANCER_FLOW = 'octavia-pre-create-loadbalancer-flow'
CREATE_SERVER_GROUP_FLOW = 'octavia-create-server-group-flow' CREATE_SERVER_GROUP_FLOW = 'octavia-create-server-group-flow'
UPDATE_LB_SERVERGROUPID_FLOW = 'octavia-update-lb-server-group-id-flow' UPDATE_LB_SERVERGROUPID_FLOW = 'octavia-update-lb-server-group-id-flow'
CREATE_LISTENERS_FLOW = 'octavia-create-all-listeners-flow'
CREATE_LOADBALANCER_FLOW = 'octavia-create-loadbalancer-flow' CREATE_LOADBALANCER_FLOW = 'octavia-create-loadbalancer-flow'
CREATE_LOADBALANCER_GRAPH_FLOW = 'octavia-create-loadbalancer-graph-flow'
CREATE_MEMBER_FLOW = 'octavia-create-member-flow' CREATE_MEMBER_FLOW = 'octavia-create-member-flow'
CREATE_POOL_FLOW = 'octavia-create-pool-flow' CREATE_POOL_FLOW = 'octavia-create-pool-flow'
CREATE_L7POLICY_FLOW = 'octavia-create-l7policy-flow' CREATE_L7POLICY_FLOW = 'octavia-create-l7policy-flow'
@ -234,6 +236,7 @@ GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'
# Task Names # Task Names
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc' RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'
RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH = 'reload-lb-after-amp-assoc-full-graph'
RELOAD_LB_AFTER_PLUG_VIP = 'reload-lb-after-plug-vip' RELOAD_LB_AFTER_PLUG_VIP = 'reload-lb-after-plug-vip'
NOVA_1 = '1.1' NOVA_1 = '1.1'

View File

@ -247,6 +247,29 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG): with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run() update_listener_tf.run()
def _get_create_load_balancer_flows(self, load_balancer, topology):
# if listeners exist then this was a request to create many resources
# at once, so different logic will be needed.
post_amp_prefix = 'post-amphora-association'
if load_balancer.listeners:
allocate_amphorae_flow, post_lb_amp_assoc_flow = (
self._lb_flows.get_create_load_balancer_graph_flows(
topology, post_amp_prefix
)
)
else:
allocate_amphorae_flow = (
self._lb_flows.get_create_load_balancer_flow(
topology=topology
)
)
post_lb_amp_assoc_flow = (
self._lb_flows.get_post_lb_amp_association_flow(
prefix=post_amp_prefix, topology=topology
)
)
return allocate_amphorae_flow, post_lb_amp_assoc_flow
def create_load_balancer(self, load_balancer_id): def create_load_balancer(self, load_balancer_id):
"""Creates a load balancer by allocating Amphorae. """Creates a load balancer by allocating Amphorae.
@ -277,14 +300,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# blogan and sbalukoff asked to remove the else check here # blogan and sbalukoff asked to remove the else check here
# as it is also checked later in the flow create code # as it is also checked later in the flow create code
create_lb_tf = self._taskflow_load( lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
self._lb_flows.get_create_load_balancer_flow( allocate_amphorae_flow, post_lb_amp_assoc_flow = (
topology=CONF.controller_worker.loadbalancer_topology), self._get_create_load_balancer_flows(lb, topology)
store=store) )
create_lb_tf = self._taskflow_load(allocate_amphorae_flow, store=store)
with tf_logging.DynamicLoggingListener( with tf_logging.DynamicLoggingListener(
create_lb_tf, log=LOG, create_lb_tf, log=LOG,
hide_inputs_outputs_of=self._exclude_result_logging_tasks): hide_inputs_outputs_of=self._exclude_result_logging_tasks):
create_lb_tf.run() create_lb_tf.run()
# Ideally the following flow should be integrated with the # Ideally the following flow should be integrated with the
@ -292,10 +316,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# current version of taskflow as it flatten out the flows. # current version of taskflow as it flatten out the flows.
# Bug report: https://bugs.launchpad.net/taskflow/+bug/1479466 # Bug report: https://bugs.launchpad.net/taskflow/+bug/1479466
post_lb_amp_assoc = self._taskflow_load( post_lb_amp_assoc = self._taskflow_load(
self._lb_flows.get_post_lb_amp_association_flow( post_lb_amp_assoc_flow, store=store)
prefix='post-amphora-association',
topology=CONF.controller_worker.loadbalancer_topology),
store=store)
with tf_logging.DynamicLoggingListener(post_lb_amp_assoc, with tf_logging.DynamicLoggingListener(post_lb_amp_assoc,
log=LOG): log=LOG):
post_lb_amp_assoc.run() post_lb_amp_assoc.run()

View File

@ -40,6 +40,26 @@ class ListenerFlows(object):
constants.LISTENERS])) constants.LISTENERS]))
return create_listener_flow return create_listener_flow
def get_create_all_listeners_flow(self):
"""Create a flow to create all listeners
:returns: The flow for creating all listeners
"""
create_all_listeners_flow = linear_flow.Flow(
constants.CREATE_LISTENERS_FLOW)
create_all_listeners_flow.add(
database_tasks.GetListenersFromLoadbalancer(
requires=constants.LOADBALANCER,
provides=constants.LISTENERS))
create_all_listeners_flow.add(database_tasks.ReloadLoadBalancer(
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
create_all_listeners_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_all_listeners_flow.add(network_tasks.UpdateVIP(
requires=constants.LOADBALANCER))
return create_all_listeners_flow
def get_delete_listener_flow(self): def get_delete_listener_flow(self):
"""Create a flow to delete a listener """Create a flow to delete a listener

View File

@ -22,6 +22,7 @@ from octavia.common import constants
from octavia.common import exceptions from octavia.common import exceptions
from octavia.controller.worker.flows import amphora_flows from octavia.controller.worker.flows import amphora_flows
from octavia.controller.worker.flows import listener_flows from octavia.controller.worker.flows import listener_flows
from octavia.controller.worker.flows import member_flows
from octavia.controller.worker.flows import pool_flows from octavia.controller.worker.flows import pool_flows
from octavia.controller.worker.tasks import amphora_driver_tasks from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import compute_tasks from octavia.controller.worker.tasks import compute_tasks
@ -43,6 +44,7 @@ class LoadBalancerFlows(object):
self.amp_flows = amphora_flows.AmphoraFlows() self.amp_flows = amphora_flows.AmphoraFlows()
self.listener_flows = listener_flows.ListenerFlows() self.listener_flows = listener_flows.ListenerFlows()
self.pool_flows = pool_flows.PoolFlows() self.pool_flows = pool_flows.PoolFlows()
self.member_flows = member_flows.MemberFlows()
def get_create_load_balancer_flow(self, topology): def get_create_load_balancer_flow(self, topology):
"""Creates a conditional graph flow that allocates a loadbalancer to """Creates a conditional graph flow that allocates a loadbalancer to
@ -102,6 +104,45 @@ class LoadBalancerFlows(object):
create_lb_flow_wrapper.add(lb_create_flow) create_lb_flow_wrapper.add(lb_create_flow)
return create_lb_flow_wrapper return create_lb_flow_wrapper
def get_create_load_balancer_graph_flows(self, topology, prefix):
allocate_amphorae_flow = self.get_create_load_balancer_flow(topology)
f_name = constants.CREATE_LOADBALANCER_GRAPH_FLOW
lb_create_graph_flow = linear_flow.Flow(f_name)
lb_create_graph_flow.add(
self.get_post_lb_amp_association_flow(prefix, topology)
)
lb_create_graph_flow.add(
database_tasks.ReloadLoadBalancer(
name=constants.RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH,
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER
)
)
lb_create_graph_flow.add(
network_tasks.CalculateDelta(
requires=constants.LOADBALANCER, provides=constants.DELTAS
)
)
lb_create_graph_flow.add(
network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS
)
)
lb_create_graph_flow.add(
amphora_driver_tasks.AmphoraePostNetworkPlug(
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)
)
)
lb_create_graph_flow.add(
self.listener_flows.get_create_all_listeners_flow()
)
lb_create_graph_flow.add(database_tasks.MarkLBActiveInDB(
mark_listeners=True,
requires=constants.LOADBALANCER)
)
return allocate_amphorae_flow, lb_create_graph_flow
def get_post_lb_amp_association_flow(self, prefix, topology): def get_post_lb_amp_association_flow(self, prefix, topology):
"""Reload the loadbalancer and create networking subflows for """Reload the loadbalancer and create networking subflows for

View File

@ -52,15 +52,21 @@ class ListenersUpdate(BaseAmphoraTask):
def execute(self, loadbalancer, listeners): def execute(self, loadbalancer, listeners):
"""Execute updates per listener for an amphora.""" """Execute updates per listener for an amphora."""
for listener in listeners: for listener in listeners:
listener.load_balancer = loadbalancer
self.amphora_driver.update(listener, loadbalancer.vip) self.amphora_driver.update(listener, loadbalancer.vip)
def revert(self, listeners, *args, **kwargs): def revert(self, loadbalancer, *args, **kwargs):
"""Handle failed listeners updates.""" """Handle failed listeners updates."""
LOG.warn(_LW("Reverting listeners updates.")) LOG.warn(_LW("Reverting listeners updates."))
for listener in listeners: for listener in loadbalancer.listeners:
self.listener_repo.update(db_apis.get_session(), id=listener.id, try:
self.listener_repo.update(db_apis.get_session(),
id=listener.id,
provisioning_status=constants.ERROR) provisioning_status=constants.ERROR)
except Exception:
LOG.warn(_LW("Failed to update listener %s provisioning "
"status..."), listener.id)
return None return None
@ -112,8 +118,13 @@ class ListenersStart(BaseAmphoraTask):
LOG.warn(_LW("Reverting listeners starts.")) LOG.warn(_LW("Reverting listeners starts."))
for listener in listeners: for listener in listeners:
self.listener_repo.update(db_apis.get_session(), id=listener.id, try:
self.listener_repo.update(db_apis.get_session(),
id=listener.id,
provisioning_status=constants.ERROR) provisioning_status=constants.ERROR)
except Exception:
LOG.warn(_LW("Failed to update listener %s provisioning "
"status..."), listener.id)
return None return None

View File

@ -690,9 +690,21 @@ class MarkLBActiveInDB(BaseDatabaseTask):
Since sqlalchemy will likely retry by itself always revert if it fails Since sqlalchemy will likely retry by itself always revert if it fails
""" """
def __init__(self, mark_listeners=False, **kwargs):
super(MarkLBActiveInDB, self).__init__(**kwargs)
self.mark_listeners = mark_listeners
def execute(self, loadbalancer): def execute(self, loadbalancer):
"""Mark the load balancer as active in DB.""" """Mark the load balancer as active in DB."""
if self.mark_listeners:
LOG.debug("Marking all listeners of loadbalancer %s ACTIVE",
loadbalancer.id)
for listener in loadbalancer.listeners:
self.listener_repo.update(db_apis.get_session(),
listener.id,
provisioning_status=constants.ACTIVE)
LOG.info(_LI("Mark ACTIVE in DB for load balancer id: %s"), LOG.info(_LI("Mark ACTIVE in DB for load balancer id: %s"),
loadbalancer.id) loadbalancer.id)
self.loadbalancer_repo.update(db_apis.get_session(), self.loadbalancer_repo.update(db_apis.get_session(),
@ -702,6 +714,18 @@ class MarkLBActiveInDB(BaseDatabaseTask):
def revert(self, loadbalancer, *args, **kwargs): def revert(self, loadbalancer, *args, **kwargs):
"""Mark the load balancer as broken and ready to be cleaned up.""" """Mark the load balancer as broken and ready to be cleaned up."""
if self.mark_listeners:
LOG.debug("Marking all listeners of loadbalancer %s ERROR",
loadbalancer.id)
for listener in loadbalancer.listeners:
try:
self.listener_repo.update(
db_apis.get_session(), listener.id,
provisioning_status=constants.ERROR)
except Exception:
LOG.warn(_LW("Error updating listener %s provisioning "
"status"), listener.id)
LOG.warn(_LW("Reverting mark load balancer deleted in DB " LOG.warn(_LW("Reverting mark load balancer deleted in DB "
"for load balancer id %s"), loadbalancer.id) "for load balancer id %s"), loadbalancer.id)
self.loadbalancer_repo.update(db_apis.get_session(), self.loadbalancer_repo.update(db_apis.get_session(),
@ -811,8 +835,12 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
loadbalancer.id, loadbalancer.id,
provisioning_status=constants.ERROR) provisioning_status=constants.ERROR)
for listener in listeners: for listener in listeners:
try:
self.listener_repo.update(db_apis.get_session(), listener.id, self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ERROR) provisioning_status=constants.ERROR)
except Exception:
LOG.warn(_LW("Failed to update listener %s provisioning "
"status..."), listener.id)
class MarkListenerActiveInDB(BaseDatabaseTask): class MarkListenerActiveInDB(BaseDatabaseTask):

View File

@ -834,16 +834,13 @@ class L7PolicyRepository(BaseRepository):
session.add(l7policy) session.add(l7policy)
session.flush() session.flush()
listener = (session.query(models.Listener).
filter_by(id=l7policy.listener_id).first())
session.refresh(listener)
# Must be done outside the transaction which creates the L7Policy # Must be done outside the transaction which creates the L7Policy
listener = (session.query(models.Listener). listener = (session.query(models.Listener).
filter_by(id=l7policy.listener_id).first()) filter_by(id=l7policy.listener_id).first())
# Immediate refresh, as we have found that sqlalchemy will sometimes # Immediate refresh, as we have found that sqlalchemy will sometimes
# cache the above query # cache the above query
session.refresh(listener) session.refresh(listener)
session.refresh(l7policy)
if position is not None and position < len(listener.l7policies) + 1: if position is not None and position < len(listener.l7policies) + 1:
with session.begin(subtransactions=True): with session.begin(subtransactions=True):

View File

@ -28,6 +28,8 @@ AUTH_VERSION = '2'
class TestAmphoraFlows(base.TestCase): class TestAmphoraFlows(base.TestCase):
def setUp(self): def setUp(self):
super(TestAmphoraFlows, self).setUp()
old_amp_driver = cfg.CONF.controller_worker.amphora_driver
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker') group='controller_worker')
cfg.CONF.set_override('enable_anti_affinity', False, cfg.CONF.set_override('enable_anti_affinity', False,
@ -36,7 +38,8 @@ class TestAmphoraFlows(base.TestCase):
conf = oslo_fixture.Config(cfg.CONF) conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="keystone_authtoken", auth_version=AUTH_VERSION) conf.config(group="keystone_authtoken", auth_version=AUTH_VERSION)
super(TestAmphoraFlows, self).setUp() self.addCleanup(cfg.CONF.set_override, 'amphora_driver',
old_amp_driver, group='controller_worker')
def test_get_create_amphora_flow(self): def test_get_create_amphora_flow(self):
@ -53,8 +56,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(0, len(amp_flow.requires)) self.assertEqual(0, len(amp_flow.requires))
def test_get_create_amphora_flow_cert(self): def test_get_create_amphora_flow_cert(self):
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker')
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow.get_create_amphora_flow() amp_flow = self.AmpFlow.get_create_amphora_flow()
@ -69,8 +70,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(0, len(amp_flow.requires)) self.assertEqual(0, len(amp_flow.requires))
def test_get_create_amphora_for_lb_flow(self): def test_get_create_amphora_for_lb_flow(self):
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker')
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow(
'SOMEPREFIX', constants.ROLE_STANDALONE) 'SOMEPREFIX', constants.ROLE_STANDALONE)
@ -155,8 +154,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(2, len(amp_flow.requires)) self.assertEqual(2, len(amp_flow.requires))
def test_get_cert_backup_create_amphora_for_lb_flow(self): def test_get_cert_backup_create_amphora_for_lb_flow(self):
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker')
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow(
@ -176,8 +173,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(1, len(amp_flow.requires)) self.assertEqual(1, len(amp_flow.requires))
def test_get_cert_bogus_create_amphora_for_lb_flow(self): def test_get_cert_bogus_create_amphora_for_lb_flow(self):
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker')
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow(
@ -337,8 +332,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(12, len(amp_flow.provides)) self.assertEqual(12, len(amp_flow.provides))
def test_cert_rotate_amphora_flow(self): def test_cert_rotate_amphora_flow(self):
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker')
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_rotate_flow = self.AmpFlow.cert_rotate_amphora_flow() amp_rotate_flow = self.AmpFlow.cert_rotate_amphora_flow()

View File

@ -76,3 +76,12 @@ class TestListenerFlows(base.TestCase):
self.assertEqual(4, len(listener_flow.requires)) self.assertEqual(4, len(listener_flow.requires))
self.assertEqual(0, len(listener_flow.provides)) self.assertEqual(0, len(listener_flow.provides))
def test_get_create_all_listeners_flow(self):
listeners_flow = self.ListenerFlow.get_create_all_listeners_flow()
self.assertIsInstance(listeners_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER, listeners_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, listeners_flow.requires)
self.assertIn(constants.LOADBALANCER, listeners_flow.provides)
self.assertEqual(2, len(listeners_flow.requires))
self.assertEqual(2, len(listeners_flow.provides))

View File

@ -27,10 +27,15 @@ import octavia.tests.unit.base as base
class TestLoadBalancerFlows(base.TestCase): class TestLoadBalancerFlows(base.TestCase):
def setUp(self): def setUp(self):
super(TestLoadBalancerFlows, self).setUp()
old_amp_driver = cfg.CONF.controller_worker.amphora_driver
cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver',
group='controller_worker')
self.LBFlow = load_balancer_flows.LoadBalancerFlows() self.LBFlow = load_balancer_flows.LoadBalancerFlows()
conf = oslo_fixture.Config(cfg.CONF) conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="nova", enable_anti_affinity=False) conf.config(group="nova", enable_anti_affinity=False)
super(TestLoadBalancerFlows, self).setUp() self.addCleanup(cfg.CONF.set_override, 'amphora_driver',
old_amp_driver, group='controller_worker')
def test_get_create_load_balancer_flow(self): def test_get_create_load_balancer_flow(self):
amp_flow = self.LBFlow.get_create_load_balancer_flow( amp_flow = self.LBFlow.get_create_load_balancer_flow(
@ -164,3 +169,81 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertEqual(4, len(amp_flow.provides)) self.assertEqual(4, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires)) self.assertEqual(2, len(amp_flow.requires))
# Test mark_active=False
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
'123', constants.TOPOLOGY_ACTIVE_STANDBY)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.UPDATE_DICT, amp_flow.requires)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertEqual(4, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
def test_get_create_load_balancer_graph_flows(self):
allocate_amp_flow, post_amp_flow = (
self.LBFlow.get_create_load_balancer_graph_flows(
constants.TOPOLOGY_SINGLE, '123'
)
)
self.assertIsInstance(allocate_amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, allocate_amp_flow.requires)
self.assertIn(constants.AMPHORA, allocate_amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, allocate_amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, allocate_amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, allocate_amp_flow.provides)
self.assertEqual(1, len(allocate_amp_flow.requires))
self.assertEqual(5, len(allocate_amp_flow.provides),
allocate_amp_flow.provides)
self.assertIsInstance(post_amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, post_amp_flow.requires)
self.assertIn(constants.UPDATE_DICT, post_amp_flow.requires)
self.assertIn(constants.LOADBALANCER, post_amp_flow.provides)
self.assertIn(constants.DELTAS, post_amp_flow.provides)
self.assertIn(constants.ADDED_PORTS, post_amp_flow.provides)
self.assertIn(constants.VIP, post_amp_flow.provides)
self.assertIn(constants.AMPS_DATA, post_amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
post_amp_flow.provides)
self.assertEqual(2, len(post_amp_flow.requires))
self.assertEqual(7, len(post_amp_flow.provides))
# Test Active/Standby
allocate_amp_flow, post_amp_flow = (
self.LBFlow.get_create_load_balancer_graph_flows(
constants.TOPOLOGY_ACTIVE_STANDBY, '123'
)
)
self.assertIsInstance(allocate_amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, allocate_amp_flow.requires)
self.assertIn(constants.AMPHORA, allocate_amp_flow.provides)
self.assertIn(constants.AMPHORA_ID, allocate_amp_flow.provides)
self.assertIn(constants.COMPUTE_ID, allocate_amp_flow.provides)
self.assertIn(constants.COMPUTE_OBJ, allocate_amp_flow.provides)
self.assertEqual(1, len(allocate_amp_flow.requires))
self.assertEqual(5, len(allocate_amp_flow.provides))
self.assertIsInstance(post_amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, post_amp_flow.requires)
self.assertIn(constants.UPDATE_DICT, post_amp_flow.requires)
self.assertIn(constants.LOADBALANCER, post_amp_flow.provides)
self.assertIn(constants.DELTAS, post_amp_flow.provides)
self.assertIn(constants.ADDED_PORTS, post_amp_flow.provides)
self.assertIn(constants.VIP, post_amp_flow.provides)
self.assertIn(constants.AMPS_DATA, post_amp_flow.provides)
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
post_amp_flow.provides)
self.assertEqual(2, len(post_amp_flow.requires))
self.assertEqual(7, len(post_amp_flow.provides))

View File

@ -18,6 +18,7 @@ from oslo_utils import uuidutils
from taskflow.types import failure from taskflow.types import failure
from octavia.common import constants from octavia.common import constants
from octavia.common import data_models
from octavia.controller.worker.tasks import amphora_driver_tasks from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.db import repositories as repo from octavia.db import repositories as repo
import octavia.tests.unit.base as base import octavia.tests.unit.base as base
@ -31,10 +32,11 @@ LB_ID = uuidutils.generate_uuid()
_amphora_mock = mock.MagicMock() _amphora_mock = mock.MagicMock()
_amphora_mock.id = AMP_ID _amphora_mock.id = AMP_ID
_amphora_mock.status = constants.AMPHORA_ALLOCATED _amphora_mock.status = constants.AMPHORA_ALLOCATED
_listener_mock = mock.MagicMock()
_listener_mock.id = LISTENER_ID
_load_balancer_mock = mock.MagicMock() _load_balancer_mock = mock.MagicMock()
_load_balancer_mock.id = LB_ID _load_balancer_mock.id = LB_ID
_listener_mock = mock.MagicMock()
_listener_mock.id = LISTENER_ID
_load_balancer_mock.listeners = [_listener_mock]
_vip_mock = mock.MagicMock() _vip_mock = mock.MagicMock()
_load_balancer_mock.vip = _vip_mock _load_balancer_mock.vip = _vip_mock
_LB_mock = mock.MagicMock() _LB_mock = mock.MagicMock()
@ -76,13 +78,45 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.update.assert_called_once_with(_listener_mock, _vip_mock) mock_driver.update.assert_called_once_with(_listener_mock, _vip_mock)
# Test the revert # Test the revert
amp = listener_update_obj.revert([_listener_mock]) amp = listener_update_obj.revert(_load_balancer_mock)
repo.ListenerRepository.update.assert_called_once_with( repo.ListenerRepository.update.assert_called_once_with(
_session_mock, _session_mock,
id=LISTENER_ID, id=LISTENER_ID,
provisioning_status=constants.ERROR) provisioning_status=constants.ERROR)
self.assertIsNone(amp) self.assertIsNone(amp)
def test_listeners_update(self,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
listeners_update_obj = amphora_driver_tasks.ListenersUpdate()
listeners = [data_models.Listener(id='listener1'),
data_models.Listener(id='listener2')]
vip = data_models.Vip(ip_address='10.0.0.1')
lb = data_models.LoadBalancer(id='lb1', listeners=listeners, vip=vip)
listeners_update_obj.execute(lb, listeners)
mock_driver.update.assert_has_calls([mock.call(listeners[0], vip),
mock.call(listeners[1], vip)])
self.assertEqual(2, mock_driver.update.call_count)
self.assertIsNotNone(listeners[0].load_balancer)
self.assertIsNotNone(listeners[1].load_balancer)
# Test the revert
amp = listeners_update_obj.revert(lb)
expected_db_calls = [mock.call(_session_mock,
id=listeners[0].id,
provisioning_status=constants.ERROR),
mock.call(_session_mock,
id=listeners[1].id,
provisioning_status=constants.ERROR)]
repo.ListenerRepository.update.has_calls(expected_db_calls)
self.assertEqual(2, repo.ListenerRepository.update.call_count)
self.assertIsNone(amp)
def test_listener_stop(self, def test_listener_stop(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,

View File

@ -22,6 +22,7 @@ from sqlalchemy.orm import exc
from taskflow.types import failure from taskflow.types import failure
from octavia.common import constants from octavia.common import constants
from octavia.common import data_models
from octavia.controller.worker.tasks import database_tasks from octavia.controller.worker.tasks import database_tasks
from octavia.db import repositories as repo from octavia.db import repositories as repo
import octavia.tests.unit.base as base import octavia.tests.unit.base as base
@ -932,6 +933,7 @@ class TestDatabaseTasks(base.TestCase):
'TEST', 'TEST',
LB_ID, LB_ID,
provisioning_status=constants.ACTIVE) provisioning_status=constants.ACTIVE)
self.assertEqual(0, repo.ListenerRepository.update.call_count)
# Test the revert # Test the revert
@ -942,6 +944,47 @@ class TestDatabaseTasks(base.TestCase):
'TEST', 'TEST',
LB_ID, LB_ID,
provisioning_status=constants.ERROR) provisioning_status=constants.ERROR)
self.assertEqual(0, repo.ListenerRepository.update.call_count)
def test_mark_LB_active_in_db_and_listeners(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
listeners = [data_models.Listener(id='listener1'),
data_models.Listener(id='listener2')]
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners)
mark_lb_active = database_tasks.MarkLBActiveInDB(mark_listeners=True)
mark_lb_active.execute(lb)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
lb.id,
provisioning_status=constants.ACTIVE)
self.assertEqual(2, repo.ListenerRepository.update.call_count)
repo.ListenerRepository.update.has_calls(
[mock.call('TEST', listeners[0].id,
provisioning_status=constants.ACTIVE),
mock.call('TEST', listeners[1].id,
provisioning_status=constants.ACTIVE)])
mock_loadbalancer_repo_update.reset_mock()
mock_listener_repo_update.reset_mock()
mark_lb_active.revert(lb)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
lb.id,
provisioning_status=constants.ERROR)
self.assertEqual(2, repo.ListenerRepository.update.call_count)
repo.ListenerRepository.update.has_calls(
[mock.call('TEST', listeners[0].id,
provisioning_status=constants.ERROR),
mock.call('TEST', listeners[1].id,
provisioning_status=constants.ERROR)])
def test_mark_LB_deleted_in_db(self, def test_mark_LB_deleted_in_db(self,
mock_generate_uuid, mock_generate_uuid,

View File

@ -19,6 +19,7 @@ from oslo_utils import uuidutils
from octavia.common import base_taskflow from octavia.common import base_taskflow
from octavia.common import constants from octavia.common import constants
from octavia.common import data_models
from octavia.controller.worker import controller_worker from octavia.controller.worker import controller_worker
import octavia.tests.unit.base as base import octavia.tests.unit.base as base
@ -403,6 +404,7 @@ class TestControllerWorker(base.TestCase):
mock_get_get_post_lb_amp_association_flow.return_value = _post_flow mock_get_get_post_lb_amp_association_flow.return_value = _post_flow
store = {constants.LOADBALANCER_ID: LB_ID, store = {constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': 'SINGLE'}} 'update_dict': {'topology': 'SINGLE'}}
setattr(mock_lb_repo_get.return_value, 'listeners', [])
cw = controller_worker.ControllerWorker() cw = controller_worker.ControllerWorker()
cw.create_load_balancer(LB_ID) cw.create_load_balancer(LB_ID)
@ -440,6 +442,91 @@ class TestControllerWorker(base.TestCase):
mock_eng.run.assert_any_call() mock_eng.run.assert_any_call()
mock_eng_post.run.assert_any_call() mock_eng_post.run.assert_any_call()
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
'LoadBalancerFlows.get_post_lb_amp_association_flow')
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
'LoadBalancerFlows.get_create_load_balancer_flow')
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
'LoadBalancerFlows.get_create_load_balancer_graph_flows')
def test_create_load_balancer_full_graph(
self,
mock_get_create_load_balancer_graph_flows,
mock_get_create_load_balancer_flow,
mock_get_post_lb_amp_association_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
CONF.set_override(group='controller_worker',
name='loadbalancer_topology',
override=constants.TOPOLOGY_SINGLE,
enforce_type=True)
listeners = [data_models.Listener(id='listener1'),
data_models.Listener(id='listener2')]
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners)
mock_lb_repo_get.return_value = lb
mock_eng = mock.Mock()
mock_eng_post = mock.Mock()
mock_taskflow_load.side_effect = [mock_eng, mock_eng_post]
_post_flow = mock.MagicMock()
mock_get_create_load_balancer_graph_flows.return_value = (
_flow_mock, _post_flow
)
store = {constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': 'SINGLE'}}
cw = controller_worker.ControllerWorker()
cw.create_load_balancer(LB_ID)
calls = [mock.call(_flow_mock, store=store),
mock.call(_post_flow, store=store)]
mock_taskflow_load.assert_has_calls(calls, any_order=True)
mock_eng.run.assert_any_call()
mock_eng_post.run.assert_any_call()
mock_get_create_load_balancer_graph_flows.assert_called_once_with(
'SINGLE', 'post-amphora-association'
)
self.assertFalse(mock_get_create_load_balancer_flow.called)
self.assertFalse(mock_get_post_lb_amp_association_flow.called)
# Test code path for active standby full lb graph creation
CONF.set_override(group='controller_worker',
name='loadbalancer_topology',
override=constants.TOPOLOGY_ACTIVE_STANDBY)
_flow_mock.reset_mock()
mock_get_create_load_balancer_graph_flows.reset_mock()
mock_taskflow_load.reset_mock()
mock_eng = mock.Mock()
mock_eng_post = mock.Mock()
mock_taskflow_load.side_effect = [mock_eng, mock_eng_post]
_post_flow = mock.MagicMock()
mock_get_create_load_balancer_graph_flows.return_value = (
_flow_mock, _post_flow
)
store = {constants.LOADBALANCER_ID: LB_ID,
'update_dict': {'topology': 'ACTIVE_STANDBY'}}
cw = controller_worker.ControllerWorker()
cw.create_load_balancer(LB_ID)
calls = [mock.call(_flow_mock, store=store),
mock.call(_post_flow, store=store)]
mock_taskflow_load.assert_has_calls(calls, any_order=True)
mock_eng.run.assert_any_call()
mock_eng_post.run.assert_any_call()
mock_get_create_load_balancer_graph_flows.assert_called_once_with(
'ACTIVE_STANDBY', 'post-amphora-association'
)
self.assertFalse(mock_get_create_load_balancer_flow.called)
self.assertFalse(mock_get_post_lb_amp_association_flow.called)
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.' @mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
'LoadBalancerFlows.get_delete_load_balancer_flow', 'LoadBalancerFlows.get_delete_load_balancer_flow',
return_value=(_flow_mock, {'test': 'test'})) return_value=(_flow_mock, {'test': 'test'}))