Rewrite create_load_balancer flow
Join two flows (create_lb_tf, post_lb_amp_assoc) into one Change-Id: I68deb7a9795cbb135dbbef5dea0bfc6aa89db5b3 Closes-Bug: #1552599
This commit is contained in:
parent
31aeced62f
commit
aae434e19a
@ -247,29 +247,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
|
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
|
||||||
update_listener_tf.run()
|
update_listener_tf.run()
|
||||||
|
|
||||||
def _get_create_load_balancer_flows(self, load_balancer, topology):
|
|
||||||
# if listeners exist then this was a request to create many resources
|
|
||||||
# at once, so different logic will be needed.
|
|
||||||
post_amp_prefix = 'post-amphora-association'
|
|
||||||
if load_balancer.listeners:
|
|
||||||
allocate_amphorae_flow, post_lb_amp_assoc_flow = (
|
|
||||||
self._lb_flows.get_create_load_balancer_graph_flows(
|
|
||||||
topology, post_amp_prefix
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
allocate_amphorae_flow = (
|
|
||||||
self._lb_flows.get_create_load_balancer_flow(
|
|
||||||
topology=topology
|
|
||||||
)
|
|
||||||
)
|
|
||||||
post_lb_amp_assoc_flow = (
|
|
||||||
self._lb_flows.get_post_lb_amp_association_flow(
|
|
||||||
prefix=post_amp_prefix, topology=topology
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return allocate_amphorae_flow, post_lb_amp_assoc_flow
|
|
||||||
|
|
||||||
def create_load_balancer(self, load_balancer_id):
|
def create_load_balancer(self, load_balancer_id):
|
||||||
"""Creates a load balancer by allocating Amphorae.
|
"""Creates a load balancer by allocating Amphorae.
|
||||||
|
|
||||||
@ -286,30 +263,20 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
|||||||
|
|
||||||
topology = CONF.controller_worker.loadbalancer_topology
|
topology = CONF.controller_worker.loadbalancer_topology
|
||||||
|
|
||||||
store[constants.UPDATE_DICT] = {constants.LOADBALANCER_TOPOLOGY:
|
store[constants.UPDATE_DICT] = {
|
||||||
topology}
|
constants.LOADBALANCER_TOPOLOGY: topology
|
||||||
|
}
|
||||||
|
|
||||||
lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
|
lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
|
||||||
allocate_amphorae_flow, post_lb_amp_assoc_flow = (
|
create_lb_flow = self._lb_flows.get_create_load_balancer_flow(
|
||||||
self._get_create_load_balancer_flows(lb, topology)
|
topology=topology, listeners=lb.listeners)
|
||||||
)
|
|
||||||
|
|
||||||
create_lb_tf = self._taskflow_load(allocate_amphorae_flow, store=store)
|
create_lb_tf = self._taskflow_load(create_lb_flow, store=store)
|
||||||
with tf_logging.DynamicLoggingListener(
|
with tf_logging.DynamicLoggingListener(
|
||||||
create_lb_tf, log=LOG,
|
create_lb_tf, log=LOG,
|
||||||
hide_inputs_outputs_of=self._exclude_result_logging_tasks):
|
hide_inputs_outputs_of=self._exclude_result_logging_tasks):
|
||||||
create_lb_tf.run()
|
create_lb_tf.run()
|
||||||
|
|
||||||
# Ideally the following flow should be integrated with the
|
|
||||||
# create_active_standby flow. This is not possible with the
|
|
||||||
# current version of taskflow as it flatten out the flows.
|
|
||||||
# Bug report: https://bugs.launchpad.net/taskflow/+bug/1479466
|
|
||||||
post_lb_amp_assoc = self._taskflow_load(
|
|
||||||
post_lb_amp_assoc_flow, store=store)
|
|
||||||
with tf_logging.DynamicLoggingListener(post_lb_amp_assoc,
|
|
||||||
log=LOG):
|
|
||||||
post_lb_amp_assoc.run()
|
|
||||||
|
|
||||||
def delete_load_balancer(self, load_balancer_id, cascade=False):
|
def delete_load_balancer(self, load_balancer_id, cascade=False):
|
||||||
"""Deletes a load balancer by de-allocating Amphorae.
|
"""Deletes a load balancer by de-allocating Amphorae.
|
||||||
|
|
||||||
|
@ -46,102 +46,108 @@ class LoadBalancerFlows(object):
|
|||||||
self.pool_flows = pool_flows.PoolFlows()
|
self.pool_flows = pool_flows.PoolFlows()
|
||||||
self.member_flows = member_flows.MemberFlows()
|
self.member_flows = member_flows.MemberFlows()
|
||||||
|
|
||||||
def get_create_load_balancer_flow(self, topology):
|
def get_create_load_balancer_flow(self, topology, listeners=None):
|
||||||
"""Creates a conditional graph flow that allocates a loadbalancer to
|
"""Creates a conditional graph flow that allocates a loadbalancer to
|
||||||
|
|
||||||
two spare amphorae.
|
two spare amphorae.
|
||||||
:raises InvalidTopology: Invalid topology specified
|
:raises InvalidTopology: Invalid topology specified
|
||||||
:return: The graph flow for creating a loadbalancer.
|
:return: The graph flow for creating a loadbalancer.
|
||||||
"""
|
"""
|
||||||
# create a linear flow as a wrapper
|
|
||||||
lf_name = constants.PRE_CREATE_LOADBALANCER_FLOW
|
|
||||||
create_lb_flow_wrapper = linear_flow.Flow(lf_name)
|
|
||||||
|
|
||||||
f_name = constants.CREATE_LOADBALANCER_FLOW
|
f_name = constants.CREATE_LOADBALANCER_FLOW
|
||||||
lb_create_flow = unordered_flow.Flow(f_name)
|
lb_create_flow = linear_flow.Flow(f_name)
|
||||||
|
|
||||||
if topology == constants.TOPOLOGY_ACTIVE_STANDBY:
|
if topology == constants.TOPOLOGY_ACTIVE_STANDBY:
|
||||||
# When we boot up amphora for an active/standby topology,
|
lb_create_flow.add(*self._create_active_standby_topology())
|
||||||
# we should leverage the Nova anti-affinity capabilities
|
|
||||||
# to place the amphora on different hosts, also we need to check
|
|
||||||
# if anti-affinity-flag is enabled or not:
|
|
||||||
anti_affinity = CONF.nova.enable_anti_affinity
|
|
||||||
if anti_affinity:
|
|
||||||
# we need to create a server group first
|
|
||||||
create_lb_flow_wrapper.add(
|
|
||||||
compute_tasks.NovaServerGroupCreate(
|
|
||||||
name=lf_name + '-' +
|
|
||||||
constants.CREATE_SERVER_GROUP_FLOW,
|
|
||||||
requires=(constants.LOADBALANCER_ID),
|
|
||||||
provides=constants.SERVER_GROUP_ID))
|
|
||||||
|
|
||||||
# update server group id in lb table
|
|
||||||
create_lb_flow_wrapper.add(
|
|
||||||
database_tasks.UpdateLBServerGroupInDB(
|
|
||||||
name=lf_name + '-' +
|
|
||||||
constants.UPDATE_LB_SERVERGROUPID_FLOW,
|
|
||||||
requires=(constants.LOADBALANCER_ID,
|
|
||||||
constants.SERVER_GROUP_ID)))
|
|
||||||
|
|
||||||
master_amp_sf = self.amp_flows.get_amphora_for_lb_subflow(
|
|
||||||
prefix=constants.ROLE_MASTER, role=constants.ROLE_MASTER)
|
|
||||||
|
|
||||||
backup_amp_sf = self.amp_flows.get_amphora_for_lb_subflow(
|
|
||||||
prefix=constants.ROLE_BACKUP, role=constants.ROLE_BACKUP)
|
|
||||||
|
|
||||||
lb_create_flow.add(master_amp_sf, backup_amp_sf)
|
|
||||||
|
|
||||||
elif topology == constants.TOPOLOGY_SINGLE:
|
elif topology == constants.TOPOLOGY_SINGLE:
|
||||||
amphora_sf = self.amp_flows.get_amphora_for_lb_subflow(
|
lb_create_flow.add(*self._create_single_topology())
|
||||||
prefix=constants.ROLE_STANDALONE,
|
|
||||||
role=constants.ROLE_STANDALONE)
|
|
||||||
lb_create_flow.add(amphora_sf)
|
|
||||||
else:
|
else:
|
||||||
LOG.error(_LE("Unknown topology: %s. Unable to build load "
|
LOG.error(_LE("Unknown topology: %s. Unable to build load "
|
||||||
"balancer."), topology)
|
"balancer."), topology)
|
||||||
raise exceptions.InvalidTopology(topology=topology)
|
raise exceptions.InvalidTopology(topology=topology)
|
||||||
|
|
||||||
create_lb_flow_wrapper.add(lb_create_flow)
|
post_amp_prefix = constants.POST_LB_AMP_ASSOCIATION_SUBFLOW
|
||||||
return create_lb_flow_wrapper
|
lb_create_flow.add(
|
||||||
|
self.get_post_lb_amp_association_flow(post_amp_prefix, topology))
|
||||||
|
|
||||||
def get_create_load_balancer_graph_flows(self, topology, prefix):
|
if listeners:
|
||||||
allocate_amphorae_flow = self.get_create_load_balancer_flow(topology)
|
lb_create_flow.add(*self._create_listeners_flow())
|
||||||
f_name = constants.CREATE_LOADBALANCER_GRAPH_FLOW
|
|
||||||
lb_create_graph_flow = linear_flow.Flow(f_name)
|
return lb_create_flow
|
||||||
lb_create_graph_flow.add(
|
|
||||||
self.get_post_lb_amp_association_flow(prefix, topology)
|
def _create_single_topology(self):
|
||||||
)
|
return (self.amp_flows.get_amphora_for_lb_subflow(
|
||||||
lb_create_graph_flow.add(
|
prefix=constants.ROLE_STANDALONE,
|
||||||
|
role=constants.ROLE_STANDALONE), )
|
||||||
|
|
||||||
|
def _create_active_standby_topology(
|
||||||
|
self, lf_name=constants.CREATE_LOADBALANCER_FLOW):
|
||||||
|
# When we boot up amphora for an active/standby topology,
|
||||||
|
# we should leverage the Nova anti-affinity capabilities
|
||||||
|
# to place the amphora on different hosts, also we need to check
|
||||||
|
# if anti-affinity-flag is enabled or not:
|
||||||
|
anti_affinity = CONF.nova.enable_anti_affinity
|
||||||
|
flows = []
|
||||||
|
if anti_affinity:
|
||||||
|
# we need to create a server group first
|
||||||
|
flows.append(
|
||||||
|
compute_tasks.NovaServerGroupCreate(
|
||||||
|
name=lf_name + '-' +
|
||||||
|
constants.CREATE_SERVER_GROUP_FLOW,
|
||||||
|
requires=(constants.LOADBALANCER_ID),
|
||||||
|
provides=constants.SERVER_GROUP_ID))
|
||||||
|
|
||||||
|
# update server group id in lb table
|
||||||
|
flows.append(
|
||||||
|
database_tasks.UpdateLBServerGroupInDB(
|
||||||
|
name=lf_name + '-' +
|
||||||
|
constants.UPDATE_LB_SERVERGROUPID_FLOW,
|
||||||
|
requires=(constants.LOADBALANCER_ID,
|
||||||
|
constants.SERVER_GROUP_ID)))
|
||||||
|
|
||||||
|
f_name = constants.CREATE_LOADBALANCER_FLOW
|
||||||
|
amps_flow = unordered_flow.Flow(f_name)
|
||||||
|
master_amp_sf = self.amp_flows.get_amphora_for_lb_subflow(
|
||||||
|
prefix=constants.ROLE_MASTER, role=constants.ROLE_MASTER)
|
||||||
|
|
||||||
|
backup_amp_sf = self.amp_flows.get_amphora_for_lb_subflow(
|
||||||
|
prefix=constants.ROLE_BACKUP, role=constants.ROLE_BACKUP)
|
||||||
|
amps_flow.add(master_amp_sf, backup_amp_sf)
|
||||||
|
|
||||||
|
return flows + [amps_flow]
|
||||||
|
|
||||||
|
def _create_listeners_flow(self):
|
||||||
|
flows = []
|
||||||
|
flows.append(
|
||||||
database_tasks.ReloadLoadBalancer(
|
database_tasks.ReloadLoadBalancer(
|
||||||
name=constants.RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH,
|
name=constants.RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH,
|
||||||
requires=constants.LOADBALANCER_ID,
|
requires=constants.LOADBALANCER_ID,
|
||||||
provides=constants.LOADBALANCER
|
provides=constants.LOADBALANCER
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
lb_create_graph_flow.add(
|
flows.append(
|
||||||
network_tasks.CalculateDelta(
|
network_tasks.CalculateDelta(
|
||||||
requires=constants.LOADBALANCER, provides=constants.DELTAS
|
requires=constants.LOADBALANCER, provides=constants.DELTAS
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
lb_create_graph_flow.add(
|
flows.append(
|
||||||
network_tasks.HandleNetworkDeltas(
|
network_tasks.HandleNetworkDeltas(
|
||||||
requires=constants.DELTAS, provides=constants.ADDED_PORTS
|
requires=constants.DELTAS, provides=constants.ADDED_PORTS
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
lb_create_graph_flow.add(
|
flows.append(
|
||||||
amphora_driver_tasks.AmphoraePostNetworkPlug(
|
amphora_driver_tasks.AmphoraePostNetworkPlug(
|
||||||
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)
|
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
lb_create_graph_flow.add(
|
flows.append(
|
||||||
self.listener_flows.get_create_all_listeners_flow()
|
self.listener_flows.get_create_all_listeners_flow()
|
||||||
)
|
)
|
||||||
|
flows.append(
|
||||||
lb_create_graph_flow.add(database_tasks.MarkLBActiveInDB(
|
database_tasks.MarkLBActiveInDB(
|
||||||
mark_listeners=True,
|
mark_listeners=True, requires=constants.LOADBALANCER
|
||||||
requires=constants.LOADBALANCER)
|
)
|
||||||
)
|
)
|
||||||
return allocate_amphorae_flow, lb_create_graph_flow
|
return flows
|
||||||
|
|
||||||
def get_post_lb_amp_association_flow(self, prefix, topology):
|
def get_post_lb_amp_association_flow(self, prefix, topology):
|
||||||
"""Reload the loadbalancer and create networking subflows for
|
"""Reload the loadbalancer and create networking subflows for
|
||||||
|
@ -189,67 +189,60 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||||||
self.assertEqual(4, len(amp_flow.provides))
|
self.assertEqual(4, len(amp_flow.provides))
|
||||||
self.assertEqual(2, len(amp_flow.requires))
|
self.assertEqual(2, len(amp_flow.requires))
|
||||||
|
|
||||||
def test_get_create_load_balancer_graph_flows(self, mock_get_net_driver):
|
def test_get_create_load_balancer_flows_single_listeners(
|
||||||
allocate_amp_flow, post_amp_flow = (
|
self, mock_get_net_driver):
|
||||||
self.LBFlow.get_create_load_balancer_graph_flows(
|
create_flow = (
|
||||||
constants.TOPOLOGY_SINGLE, '123'
|
self.LBFlow.get_create_load_balancer_flow(
|
||||||
|
constants.TOPOLOGY_SINGLE, True
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.assertIsInstance(allocate_amp_flow, flow.Flow)
|
self.assertIsInstance(create_flow, flow.Flow)
|
||||||
self.assertIn(constants.LOADBALANCER_ID, allocate_amp_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, create_flow.requires)
|
||||||
|
self.assertIn(constants.UPDATE_DICT, create_flow.requires)
|
||||||
|
|
||||||
self.assertIn(constants.AMPHORA, allocate_amp_flow.provides)
|
self.assertIn(constants.LOADBALANCER, create_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, allocate_amp_flow.provides)
|
self.assertIn(constants.LISTENERS, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, allocate_amp_flow.provides)
|
self.assertIn(constants.AMPHORA, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_OBJ, allocate_amp_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, create_flow.provides)
|
||||||
|
self.assertIn(constants.COMPUTE_ID, create_flow.provides)
|
||||||
self.assertEqual(1, len(allocate_amp_flow.requires))
|
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
|
||||||
self.assertEqual(5, len(allocate_amp_flow.provides),
|
self.assertIn(constants.LOADBALANCER, create_flow.provides)
|
||||||
allocate_amp_flow.provides)
|
self.assertIn(constants.DELTAS, create_flow.provides)
|
||||||
|
self.assertIn(constants.ADDED_PORTS, create_flow.provides)
|
||||||
self.assertIsInstance(post_amp_flow, flow.Flow)
|
self.assertIn(constants.VIP, create_flow.provides)
|
||||||
self.assertIn(constants.LOADBALANCER_ID, post_amp_flow.requires)
|
self.assertIn(constants.AMPS_DATA, create_flow.provides)
|
||||||
self.assertIn(constants.UPDATE_DICT, post_amp_flow.requires)
|
|
||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.DELTAS, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.ADDED_PORTS, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.VIP, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.AMPS_DATA, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||||
post_amp_flow.provides)
|
create_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(2, len(post_amp_flow.requires))
|
self.assertEqual(2, len(create_flow.requires))
|
||||||
self.assertEqual(7, len(post_amp_flow.provides))
|
self.assertEqual(12, len(create_flow.provides),
|
||||||
|
create_flow.provides)
|
||||||
|
|
||||||
# Test Active/Standby
|
def test_get_create_load_balancer_flows_active_standby_listeners(
|
||||||
allocate_amp_flow, post_amp_flow = (
|
self, mock_get_net_driver):
|
||||||
self.LBFlow.get_create_load_balancer_graph_flows(
|
create_flow = (
|
||||||
constants.TOPOLOGY_ACTIVE_STANDBY, '123'
|
self.LBFlow.get_create_load_balancer_flow(
|
||||||
|
constants.TOPOLOGY_ACTIVE_STANDBY, True
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.assertIsInstance(allocate_amp_flow, flow.Flow)
|
self.assertIsInstance(create_flow, flow.Flow)
|
||||||
self.assertIn(constants.LOADBALANCER_ID, allocate_amp_flow.requires)
|
self.assertIn(constants.LOADBALANCER_ID, create_flow.requires)
|
||||||
|
self.assertIn(constants.UPDATE_DICT, create_flow.requires)
|
||||||
|
|
||||||
self.assertIn(constants.AMPHORA, allocate_amp_flow.provides)
|
self.assertIn(constants.LOADBALANCER, create_flow.provides)
|
||||||
self.assertIn(constants.AMPHORA_ID, allocate_amp_flow.provides)
|
self.assertIn(constants.LISTENERS, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_ID, allocate_amp_flow.provides)
|
self.assertIn(constants.AMPHORA, create_flow.provides)
|
||||||
self.assertIn(constants.COMPUTE_OBJ, allocate_amp_flow.provides)
|
self.assertIn(constants.AMPHORA_ID, create_flow.provides)
|
||||||
|
self.assertIn(constants.COMPUTE_ID, create_flow.provides)
|
||||||
self.assertEqual(1, len(allocate_amp_flow.requires))
|
self.assertIn(constants.COMPUTE_OBJ, create_flow.provides)
|
||||||
self.assertEqual(5, len(allocate_amp_flow.provides))
|
self.assertIn(constants.LOADBALANCER, create_flow.provides)
|
||||||
|
self.assertIn(constants.DELTAS, create_flow.provides)
|
||||||
self.assertIsInstance(post_amp_flow, flow.Flow)
|
self.assertIn(constants.ADDED_PORTS, create_flow.provides)
|
||||||
self.assertIn(constants.LOADBALANCER_ID, post_amp_flow.requires)
|
self.assertIn(constants.VIP, create_flow.provides)
|
||||||
self.assertIn(constants.UPDATE_DICT, post_amp_flow.requires)
|
self.assertIn(constants.AMPS_DATA, create_flow.provides)
|
||||||
|
|
||||||
self.assertIn(constants.LOADBALANCER, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.DELTAS, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.ADDED_PORTS, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.VIP, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.AMPS_DATA, post_amp_flow.provides)
|
|
||||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||||
post_amp_flow.provides)
|
create_flow.provides)
|
||||||
|
|
||||||
self.assertEqual(2, len(post_amp_flow.requires))
|
self.assertEqual(2, len(create_flow.requires))
|
||||||
self.assertEqual(7, len(post_amp_flow.provides))
|
self.assertEqual(12, len(create_flow.provides),
|
||||||
|
create_flow.provides)
|
||||||
|
@ -370,25 +370,23 @@ class TestControllerWorker(base.TestCase):
|
|||||||
|
|
||||||
_flow_mock.run.assert_called_once_with()
|
_flow_mock.run.assert_called_once_with()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
|
||||||
'LoadBalancerFlows.get_post_lb_amp_association_flow')
|
|
||||||
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
||||||
'LoadBalancerFlows.get_create_load_balancer_flow',
|
'LoadBalancerFlows.get_create_load_balancer_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
def test_create_load_balancer(self,
|
def test_create_load_balancer_single(
|
||||||
mock_get_create_load_balancer_flow,
|
self,
|
||||||
mock_get_get_post_lb_amp_association_flow,
|
mock_get_create_load_balancer_flow,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
mock_taskflow_load,
|
mock_taskflow_load,
|
||||||
mock_pool_repo_get,
|
mock_pool_repo_get,
|
||||||
mock_member_repo_get,
|
mock_member_repo_get,
|
||||||
mock_l7rule_repo_get,
|
mock_l7rule_repo_get,
|
||||||
mock_l7policy_repo_get,
|
mock_l7policy_repo_get,
|
||||||
mock_listener_repo_get,
|
mock_listener_repo_get,
|
||||||
mock_lb_repo_get,
|
mock_lb_repo_get,
|
||||||
mock_health_mon_repo_get,
|
mock_health_mon_repo_get,
|
||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
# Test the code path with an SINGLE topology
|
# Test the code path with an SINGLE topology
|
||||||
CONF.set_override(group='controller_worker',
|
CONF.set_override(group='controller_worker',
|
||||||
@ -398,61 +396,68 @@ class TestControllerWorker(base.TestCase):
|
|||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
mock_taskflow_load.reset_mock()
|
mock_taskflow_load.reset_mock()
|
||||||
mock_eng = mock.Mock()
|
mock_eng = mock.Mock()
|
||||||
mock_eng_post = mock.Mock()
|
mock_taskflow_load.return_value = mock_eng
|
||||||
mock_taskflow_load.side_effect = [mock_eng, mock_eng_post]
|
store = {
|
||||||
_post_flow = mock.MagicMock()
|
constants.LOADBALANCER_ID: LB_ID,
|
||||||
mock_get_get_post_lb_amp_association_flow.return_value = _post_flow
|
'update_dict': {'topology': constants.TOPOLOGY_SINGLE}
|
||||||
store = {constants.LOADBALANCER_ID: LB_ID,
|
}
|
||||||
'update_dict': {'topology': 'SINGLE'}}
|
|
||||||
setattr(mock_lb_repo_get.return_value, 'listeners', [])
|
setattr(mock_lb_repo_get.return_value, 'listeners', [])
|
||||||
|
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.create_load_balancer(LB_ID)
|
cw.create_load_balancer(LB_ID)
|
||||||
|
|
||||||
calls = [mock.call(_flow_mock, store=store),
|
mock_get_create_load_balancer_flow.assert_called_with(
|
||||||
mock.call(_post_flow, store=store)]
|
topology=constants.TOPOLOGY_SINGLE, listeners=[])
|
||||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
mock_taskflow_load.assert_called_with(
|
||||||
assert_has_calls(calls, any_order=True))
|
mock_get_create_load_balancer_flow.return_value, store=store)
|
||||||
mock_eng.run.assert_any_call()
|
mock_eng.run.assert_any_call()
|
||||||
mock_eng_post.run.assert_any_call()
|
|
||||||
|
|
||||||
# Test the code path with an ACTIVE_STANDBY topology
|
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
||||||
|
'LoadBalancerFlows.get_create_load_balancer_flow',
|
||||||
|
return_value=_flow_mock)
|
||||||
|
def test_create_load_balancer_active_standby(
|
||||||
|
self,
|
||||||
|
mock_get_create_load_balancer_flow,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
CONF.set_override(group='controller_worker',
|
CONF.set_override(group='controller_worker',
|
||||||
name='loadbalancer_topology',
|
name='loadbalancer_topology',
|
||||||
override=constants.TOPOLOGY_ACTIVE_STANDBY,
|
override=constants.TOPOLOGY_ACTIVE_STANDBY,
|
||||||
enforce_type=True)
|
enforce_type=True)
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
mock_taskflow_load.reset_mock()
|
mock_taskflow_load.reset_mock()
|
||||||
mock_eng = mock.Mock()
|
mock_eng = mock.Mock()
|
||||||
mock_eng_post = mock.Mock()
|
mock_taskflow_load.return_value = mock_eng
|
||||||
mock_taskflow_load.side_effect = [mock_eng, mock_eng_post]
|
store = {
|
||||||
_post_flow = mock.MagicMock()
|
constants.LOADBALANCER_ID: LB_ID,
|
||||||
mock_get_get_post_lb_amp_association_flow.return_value = _post_flow
|
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}
|
||||||
store = {constants.LOADBALANCER_ID: LB_ID,
|
}
|
||||||
'update_dict': {'topology': 'ACTIVE_STANDBY'}}
|
setattr(mock_lb_repo_get.return_value, 'listeners', [])
|
||||||
|
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.create_load_balancer(LB_ID)
|
cw.create_load_balancer(LB_ID)
|
||||||
|
|
||||||
calls = [mock.call(_flow_mock, store=store),
|
mock_get_create_load_balancer_flow.assert_called_with(
|
||||||
mock.call(_post_flow, store=store)]
|
topology=constants.TOPOLOGY_ACTIVE_STANDBY, listeners=[])
|
||||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
mock_taskflow_load.assert_called_with(
|
||||||
assert_has_calls(calls, any_order=True))
|
mock_get_create_load_balancer_flow.return_value, store=store)
|
||||||
mock_eng.run.assert_any_call()
|
mock_eng.run.assert_any_call()
|
||||||
mock_eng_post.run.assert_any_call()
|
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
|
||||||
'LoadBalancerFlows.get_post_lb_amp_association_flow')
|
|
||||||
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
||||||
'LoadBalancerFlows.get_create_load_balancer_flow')
|
'LoadBalancerFlows.get_create_load_balancer_flow')
|
||||||
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
def test_create_load_balancer_full_graph_single(
|
||||||
'LoadBalancerFlows.get_create_load_balancer_graph_flows')
|
|
||||||
def test_create_load_balancer_full_graph(
|
|
||||||
self,
|
self,
|
||||||
mock_get_create_load_balancer_graph_flows,
|
|
||||||
mock_get_create_load_balancer_flow,
|
mock_get_create_load_balancer_flow,
|
||||||
mock_get_post_lb_amp_association_flow,
|
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
mock_taskflow_load,
|
mock_taskflow_load,
|
||||||
@ -473,59 +478,70 @@ class TestControllerWorker(base.TestCase):
|
|||||||
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners)
|
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners)
|
||||||
mock_lb_repo_get.return_value = lb
|
mock_lb_repo_get.return_value = lb
|
||||||
mock_eng = mock.Mock()
|
mock_eng = mock.Mock()
|
||||||
mock_eng_post = mock.Mock()
|
mock_taskflow_load.return_value = mock_eng
|
||||||
mock_taskflow_load.side_effect = [mock_eng, mock_eng_post]
|
store = {
|
||||||
_post_flow = mock.MagicMock()
|
constants.LOADBALANCER_ID: LB_ID,
|
||||||
mock_get_create_load_balancer_graph_flows.return_value = (
|
'update_dict': {'topology': constants.TOPOLOGY_SINGLE}
|
||||||
_flow_mock, _post_flow
|
}
|
||||||
)
|
|
||||||
store = {constants.LOADBALANCER_ID: LB_ID,
|
|
||||||
'update_dict': {'topology': 'SINGLE'}}
|
|
||||||
|
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.create_load_balancer(LB_ID)
|
cw.create_load_balancer(LB_ID)
|
||||||
|
|
||||||
calls = [mock.call(_flow_mock, store=store),
|
# mock_create_single_topology.assert_called_once()
|
||||||
mock.call(_post_flow, store=store)]
|
# mock_create_active_standby_topology.assert_not_called()
|
||||||
mock_taskflow_load.assert_has_calls(calls, any_order=True)
|
mock_get_create_load_balancer_flow.assert_called_with(
|
||||||
|
topology=constants.TOPOLOGY_SINGLE, listeners=lb.listeners)
|
||||||
|
mock_taskflow_load.assert_called_with(
|
||||||
|
mock_get_create_load_balancer_flow.return_value, store=store)
|
||||||
mock_eng.run.assert_any_call()
|
mock_eng.run.assert_any_call()
|
||||||
mock_eng_post.run.assert_any_call()
|
|
||||||
mock_get_create_load_balancer_graph_flows.assert_called_once_with(
|
|
||||||
'SINGLE', 'post-amphora-association'
|
|
||||||
)
|
|
||||||
self.assertFalse(mock_get_create_load_balancer_flow.called)
|
|
||||||
self.assertFalse(mock_get_post_lb_amp_association_flow.called)
|
|
||||||
|
|
||||||
# Test code path for active standby full lb graph creation
|
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
||||||
|
'LoadBalancerFlows.get_create_load_balancer_flow')
|
||||||
|
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
||||||
|
'LoadBalancerFlows._create_single_topology')
|
||||||
|
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
||||||
|
'LoadBalancerFlows._create_active_standby_topology')
|
||||||
|
def test_create_load_balancer_full_graph_active_standby(
|
||||||
|
self,
|
||||||
|
mock_create_active_standby_topology,
|
||||||
|
mock_create_single_topology,
|
||||||
|
mock_get_create_load_balancer_flow,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
CONF.set_override(group='controller_worker',
|
CONF.set_override(group='controller_worker',
|
||||||
name='loadbalancer_topology',
|
name='loadbalancer_topology',
|
||||||
override=constants.TOPOLOGY_ACTIVE_STANDBY)
|
override=constants.TOPOLOGY_ACTIVE_STANDBY,
|
||||||
_flow_mock.reset_mock()
|
enforce_type=True)
|
||||||
mock_get_create_load_balancer_graph_flows.reset_mock()
|
listeners = [data_models.Listener(id='listener1'),
|
||||||
mock_taskflow_load.reset_mock()
|
data_models.Listener(id='listener2')]
|
||||||
|
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners)
|
||||||
|
mock_lb_repo_get.return_value = lb
|
||||||
mock_eng = mock.Mock()
|
mock_eng = mock.Mock()
|
||||||
mock_eng_post = mock.Mock()
|
mock_taskflow_load.return_value = mock_eng
|
||||||
mock_taskflow_load.side_effect = [mock_eng, mock_eng_post]
|
store = {
|
||||||
_post_flow = mock.MagicMock()
|
constants.LOADBALANCER_ID: LB_ID,
|
||||||
mock_get_create_load_balancer_graph_flows.return_value = (
|
'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}
|
||||||
_flow_mock, _post_flow
|
}
|
||||||
)
|
|
||||||
store = {constants.LOADBALANCER_ID: LB_ID,
|
|
||||||
'update_dict': {'topology': 'ACTIVE_STANDBY'}}
|
|
||||||
|
|
||||||
cw = controller_worker.ControllerWorker()
|
cw = controller_worker.ControllerWorker()
|
||||||
cw.create_load_balancer(LB_ID)
|
cw.create_load_balancer(LB_ID)
|
||||||
|
|
||||||
calls = [mock.call(_flow_mock, store=store),
|
# mock_create_single_topology.assert_not_called()
|
||||||
mock.call(_post_flow, store=store)]
|
# mock_create_active_standby_topology.assert_called_once()
|
||||||
mock_taskflow_load.assert_has_calls(calls, any_order=True)
|
mock_get_create_load_balancer_flow.assert_called_with(
|
||||||
|
topology=constants.TOPOLOGY_ACTIVE_STANDBY, listeners=lb.listeners)
|
||||||
|
mock_taskflow_load.assert_called_with(
|
||||||
|
mock_get_create_load_balancer_flow.return_value, store=store)
|
||||||
mock_eng.run.assert_any_call()
|
mock_eng.run.assert_any_call()
|
||||||
mock_eng_post.run.assert_any_call()
|
|
||||||
mock_get_create_load_balancer_graph_flows.assert_called_once_with(
|
|
||||||
'ACTIVE_STANDBY', 'post-amphora-association'
|
|
||||||
)
|
|
||||||
self.assertFalse(mock_get_create_load_balancer_flow.called)
|
|
||||||
self.assertFalse(mock_get_post_lb_amp_association_flow.called)
|
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
|
||||||
'LoadBalancerFlows.get_delete_load_balancer_flow',
|
'LoadBalancerFlows.get_delete_load_balancer_flow',
|
||||||
|
@ -56,15 +56,6 @@ def generate(flow_list, output_directory):
|
|||||||
current_engine = engines.load(
|
current_engine = engines.load(
|
||||||
get_flow_method(
|
get_flow_method(
|
||||||
constants.TOPOLOGY_ACTIVE_STANDBY))
|
constants.TOPOLOGY_ACTIVE_STANDBY))
|
||||||
elif (current_tuple[1] == 'LoadBalancerFlows' and
|
|
||||||
current_tuple[2] == 'get_create_load_balancer_graph_flows'):
|
|
||||||
# This is lame until we refactor the create load balancer
|
|
||||||
# flow into one flow now that
|
|
||||||
# https://bugs.launchpad.net/taskflow/+bug/1479466
|
|
||||||
# is fixed.
|
|
||||||
allocate_amp_flow, lb_create_graph_flow = get_flow_method(
|
|
||||||
constants.TOPOLOGY_ACTIVE_STANDBY, 'prefixname')
|
|
||||||
current_engine = engines.load(lb_create_graph_flow)
|
|
||||||
elif (current_tuple[1] == 'LoadBalancerFlows' and
|
elif (current_tuple[1] == 'LoadBalancerFlows' and
|
||||||
current_tuple[2] == 'get_delete_load_balancer_flow'):
|
current_tuple[2] == 'get_delete_load_balancer_flow'):
|
||||||
lb = dmh.generate_load_balancer()
|
lb = dmh.generate_load_balancer()
|
||||||
|
@ -6,7 +6,6 @@ octavia.controller.worker.flows.amphora_flows AmphoraFlows get_create_amphora_fl
|
|||||||
octavia.controller.worker.flows.amphora_flows AmphoraFlows get_failover_flow
|
octavia.controller.worker.flows.amphora_flows AmphoraFlows get_failover_flow
|
||||||
octavia.controller.worker.flows.amphora_flows AmphoraFlows cert_rotate_amphora_flow
|
octavia.controller.worker.flows.amphora_flows AmphoraFlows cert_rotate_amphora_flow
|
||||||
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_create_load_balancer_flow
|
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_create_load_balancer_flow
|
||||||
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_create_load_balancer_graph_flows
|
|
||||||
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_delete_load_balancer_flow
|
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_delete_load_balancer_flow
|
||||||
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_cascade_delete_load_balancer_flow
|
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_cascade_delete_load_balancer_flow
|
||||||
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_update_load_balancer_flow
|
octavia.controller.worker.flows.load_balancer_flows LoadBalancerFlows get_update_load_balancer_flow
|
||||||
|
Loading…
Reference in New Issue
Block a user