From 4a1acafa12280710a963cb0ec92c549b40a833cf Mon Sep 17 00:00:00 2001 From: Trevor Vardeman Date: Sun, 14 Feb 2016 17:52:46 -0600 Subject: [PATCH] Get Me A Load Balancer Controller Sets up the flows and some new tasks required to create all the resources needed for an entire load balancer graph. This includes updating all listeners on all amphorae (depending on topology), and plugging networks and setting up the routes and rules on every amphora for a load balancer. Luckily this mostly reuses tasks and flows that were already created, though some new tasks and flows were created specifically for handling many listeners. Co-Authored-By: Trevor Vardeman Change-Id: I43a838e80281a37537e179cd8d4768f45e1ca7f1 --- octavia/common/constants.py | 3 + .../controller/worker/controller_worker.py | 39 +++++++-- .../controller/worker/flows/listener_flows.py | 20 +++++ .../worker/flows/load_balancer_flows.py | 41 +++++++++ .../worker/tasks/amphora_driver_tasks.py | 23 +++-- .../controller/worker/tasks/database_tasks.py | 32 ++++++- octavia/db/repositories.py | 5 +- .../worker/flows/test_amphora_flows.py | 15 +--- .../worker/flows/test_listener_flows.py | 9 ++ .../worker/flows/test_load_balancer_flows.py | 85 +++++++++++++++++- .../worker/tasks/test_amphora_driver_tasks.py | 40 ++++++++- .../worker/tasks/test_database_tasks.py | 45 +++++++++- .../worker/test_controller_worker.py | 87 +++++++++++++++++++ 13 files changed, 407 insertions(+), 37 deletions(-) diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 57ae5ed36f..3eb7bdc09c 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -171,7 +171,9 @@ CREATE_LISTENER_FLOW = 'octavia-create-listener_flow' PRE_CREATE_LOADBALANCER_FLOW = 'octavia-pre-create-loadbalancer-flow' CREATE_SERVER_GROUP_FLOW = 'octavia-create-server-group-flow' UPDATE_LB_SERVERGROUPID_FLOW = 'octavia-update-lb-server-group-id-flow' +CREATE_LISTENERS_FLOW = 'octavia-create-all-listeners-flow' CREATE_LOADBALANCER_FLOW = 'octavia-create-loadbalancer-flow' +CREATE_LOADBALANCER_GRAPH_FLOW = 'octavia-create-loadbalancer-graph-flow' CREATE_MEMBER_FLOW = 'octavia-create-member-flow' CREATE_POOL_FLOW = 'octavia-create-pool-flow' CREATE_L7POLICY_FLOW = 'octavia-create-l7policy-flow' @@ -234,6 +236,7 @@ GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask' # Task Names RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc' +RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH = 'reload-lb-after-amp-assoc-full-graph' RELOAD_LB_AFTER_PLUG_VIP = 'reload-lb-after-plug-vip' NOVA_1 = '1.1' diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index 6c1c999670..6b248e60aa 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -247,6 +247,29 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG): update_listener_tf.run() + def _get_create_load_balancer_flows(self, load_balancer, topology): + # if listeners exist then this was a request to create many resources + # at once, so different logic will be needed. + post_amp_prefix = 'post-amphora-association' + if load_balancer.listeners: + allocate_amphorae_flow, post_lb_amp_assoc_flow = ( + self._lb_flows.get_create_load_balancer_graph_flows( + topology, post_amp_prefix + ) + ) + else: + allocate_amphorae_flow = ( + self._lb_flows.get_create_load_balancer_flow( + topology=topology + ) + ) + post_lb_amp_assoc_flow = ( + self._lb_flows.get_post_lb_amp_association_flow( + prefix=post_amp_prefix, topology=topology + ) + ) + return allocate_amphorae_flow, post_lb_amp_assoc_flow + def create_load_balancer(self, load_balancer_id): """Creates a load balancer by allocating Amphorae. @@ -277,14 +300,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): # blogan and sbalukoff asked to remove the else check here # as it is also checked later in the flow create code - create_lb_tf = self._taskflow_load( - self._lb_flows.get_create_load_balancer_flow( - topology=CONF.controller_worker.loadbalancer_topology), - store=store) + lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id) + allocate_amphorae_flow, post_lb_amp_assoc_flow = ( + self._get_create_load_balancer_flows(lb, topology) + ) + + create_lb_tf = self._taskflow_load(allocate_amphorae_flow, store=store) with tf_logging.DynamicLoggingListener( create_lb_tf, log=LOG, hide_inputs_outputs_of=self._exclude_result_logging_tasks): - create_lb_tf.run() # Ideally the following flow should be integrated with the @@ -292,10 +316,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): # current version of taskflow as it flatten out the flows. # Bug report: https://bugs.launchpad.net/taskflow/+bug/1479466 post_lb_amp_assoc = self._taskflow_load( - self._lb_flows.get_post_lb_amp_association_flow( - prefix='post-amphora-association', - topology=CONF.controller_worker.loadbalancer_topology), - store=store) + post_lb_amp_assoc_flow, store=store) with tf_logging.DynamicLoggingListener(post_lb_amp_assoc, log=LOG): post_lb_amp_assoc.run() diff --git a/octavia/controller/worker/flows/listener_flows.py b/octavia/controller/worker/flows/listener_flows.py index 6735092afb..45d888d479 100644 --- a/octavia/controller/worker/flows/listener_flows.py +++ b/octavia/controller/worker/flows/listener_flows.py @@ -40,6 +40,26 @@ class ListenerFlows(object): constants.LISTENERS])) return create_listener_flow + def get_create_all_listeners_flow(self): + """Create a flow to create all listeners + + :returns: The flow for creating all listeners + """ + create_all_listeners_flow = linear_flow.Flow( + constants.CREATE_LISTENERS_FLOW) + create_all_listeners_flow.add( + database_tasks.GetListenersFromLoadbalancer( + requires=constants.LOADBALANCER, + provides=constants.LISTENERS)) + create_all_listeners_flow.add(database_tasks.ReloadLoadBalancer( + requires=constants.LOADBALANCER_ID, + provides=constants.LOADBALANCER)) + create_all_listeners_flow.add(amphora_driver_tasks.ListenersUpdate( + requires=[constants.LOADBALANCER, constants.LISTENERS])) + create_all_listeners_flow.add(network_tasks.UpdateVIP( + requires=constants.LOADBALANCER)) + return create_all_listeners_flow + def get_delete_listener_flow(self): """Create a flow to delete a listener diff --git a/octavia/controller/worker/flows/load_balancer_flows.py b/octavia/controller/worker/flows/load_balancer_flows.py index 9217921ace..9de65c00d1 100644 --- a/octavia/controller/worker/flows/load_balancer_flows.py +++ b/octavia/controller/worker/flows/load_balancer_flows.py @@ -22,6 +22,7 @@ from octavia.common import constants from octavia.common import exceptions from octavia.controller.worker.flows import amphora_flows from octavia.controller.worker.flows import listener_flows +from octavia.controller.worker.flows import member_flows from octavia.controller.worker.flows import pool_flows from octavia.controller.worker.tasks import amphora_driver_tasks from octavia.controller.worker.tasks import compute_tasks @@ -43,6 +44,7 @@ class LoadBalancerFlows(object): self.amp_flows = amphora_flows.AmphoraFlows() self.listener_flows = listener_flows.ListenerFlows() self.pool_flows = pool_flows.PoolFlows() + self.member_flows = member_flows.MemberFlows() def get_create_load_balancer_flow(self, topology): """Creates a conditional graph flow that allocates a loadbalancer to @@ -102,6 +104,45 @@ class LoadBalancerFlows(object): create_lb_flow_wrapper.add(lb_create_flow) return create_lb_flow_wrapper + def get_create_load_balancer_graph_flows(self, topology, prefix): + allocate_amphorae_flow = self.get_create_load_balancer_flow(topology) + f_name = constants.CREATE_LOADBALANCER_GRAPH_FLOW + lb_create_graph_flow = linear_flow.Flow(f_name) + lb_create_graph_flow.add( + self.get_post_lb_amp_association_flow(prefix, topology) + ) + lb_create_graph_flow.add( + database_tasks.ReloadLoadBalancer( + name=constants.RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH, + requires=constants.LOADBALANCER_ID, + provides=constants.LOADBALANCER + ) + ) + lb_create_graph_flow.add( + network_tasks.CalculateDelta( + requires=constants.LOADBALANCER, provides=constants.DELTAS + ) + ) + lb_create_graph_flow.add( + network_tasks.HandleNetworkDeltas( + requires=constants.DELTAS, provides=constants.ADDED_PORTS + ) + ) + lb_create_graph_flow.add( + amphora_driver_tasks.AmphoraePostNetworkPlug( + requires=(constants.LOADBALANCER, constants.ADDED_PORTS) + ) + ) + lb_create_graph_flow.add( + self.listener_flows.get_create_all_listeners_flow() + ) + + lb_create_graph_flow.add(database_tasks.MarkLBActiveInDB( + mark_listeners=True, + requires=constants.LOADBALANCER) + ) + return allocate_amphorae_flow, lb_create_graph_flow + def get_post_lb_amp_association_flow(self, prefix, topology): """Reload the loadbalancer and create networking subflows for diff --git a/octavia/controller/worker/tasks/amphora_driver_tasks.py b/octavia/controller/worker/tasks/amphora_driver_tasks.py index f8f34b274e..7ec402dcb1 100644 --- a/octavia/controller/worker/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/tasks/amphora_driver_tasks.py @@ -52,15 +52,21 @@ class ListenersUpdate(BaseAmphoraTask): def execute(self, loadbalancer, listeners): """Execute updates per listener for an amphora.""" for listener in listeners: + listener.load_balancer = loadbalancer self.amphora_driver.update(listener, loadbalancer.vip) - def revert(self, listeners, *args, **kwargs): + def revert(self, loadbalancer, *args, **kwargs): """Handle failed listeners updates.""" LOG.warn(_LW("Reverting listeners updates.")) - for listener in listeners: - self.listener_repo.update(db_apis.get_session(), id=listener.id, - provisioning_status=constants.ERROR) + for listener in loadbalancer.listeners: + try: + self.listener_repo.update(db_apis.get_session(), + id=listener.id, + provisioning_status=constants.ERROR) + except Exception: + LOG.warn(_LW("Failed to update listener %s provisioning " + "status..."), listener.id) return None @@ -112,8 +118,13 @@ class ListenersStart(BaseAmphoraTask): LOG.warn(_LW("Reverting listeners starts.")) for listener in listeners: - self.listener_repo.update(db_apis.get_session(), id=listener.id, - provisioning_status=constants.ERROR) + try: + self.listener_repo.update(db_apis.get_session(), + id=listener.id, + provisioning_status=constants.ERROR) + except Exception: + LOG.warn(_LW("Failed to update listener %s provisioning " + "status..."), listener.id) return None diff --git a/octavia/controller/worker/tasks/database_tasks.py b/octavia/controller/worker/tasks/database_tasks.py index d4b57bf805..6b77346a73 100755 --- a/octavia/controller/worker/tasks/database_tasks.py +++ b/octavia/controller/worker/tasks/database_tasks.py @@ -690,9 +690,21 @@ class MarkLBActiveInDB(BaseDatabaseTask): Since sqlalchemy will likely retry by itself always revert if it fails """ + def __init__(self, mark_listeners=False, **kwargs): + super(MarkLBActiveInDB, self).__init__(**kwargs) + self.mark_listeners = mark_listeners + def execute(self, loadbalancer): """Mark the load balancer as active in DB.""" + if self.mark_listeners: + LOG.debug("Marking all listeners of loadbalancer %s ACTIVE", + loadbalancer.id) + for listener in loadbalancer.listeners: + self.listener_repo.update(db_apis.get_session(), + listener.id, + provisioning_status=constants.ACTIVE) + LOG.info(_LI("Mark ACTIVE in DB for load balancer id: %s"), loadbalancer.id) self.loadbalancer_repo.update(db_apis.get_session(), @@ -702,6 +714,18 @@ class MarkLBActiveInDB(BaseDatabaseTask): def revert(self, loadbalancer, *args, **kwargs): """Mark the load balancer as broken and ready to be cleaned up.""" + if self.mark_listeners: + LOG.debug("Marking all listeners of loadbalancer %s ERROR", + loadbalancer.id) + for listener in loadbalancer.listeners: + try: + self.listener_repo.update( + db_apis.get_session(), listener.id, + provisioning_status=constants.ERROR) + except Exception: + LOG.warn(_LW("Error updating listener %s provisioning " + "status"), listener.id) + LOG.warn(_LW("Reverting mark load balancer deleted in DB " "for load balancer id %s"), loadbalancer.id) self.loadbalancer_repo.update(db_apis.get_session(), @@ -811,8 +835,12 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask): loadbalancer.id, provisioning_status=constants.ERROR) for listener in listeners: - self.listener_repo.update(db_apis.get_session(), listener.id, - provisioning_status=constants.ERROR) + try: + self.listener_repo.update(db_apis.get_session(), listener.id, + provisioning_status=constants.ERROR) + except Exception: + LOG.warn(_LW("Failed to update listener %s provisioning " + "status..."), listener.id) class MarkListenerActiveInDB(BaseDatabaseTask): diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index df3311bc24..b5365453c0 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -834,16 +834,13 @@ class L7PolicyRepository(BaseRepository): session.add(l7policy) session.flush() - listener = (session.query(models.Listener). - filter_by(id=l7policy.listener_id).first()) - session.refresh(listener) - # Must be done outside the transaction which creates the L7Policy listener = (session.query(models.Listener). filter_by(id=l7policy.listener_id).first()) # Immediate refresh, as we have found that sqlalchemy will sometimes # cache the above query session.refresh(listener) + session.refresh(l7policy) if position is not None and position < len(listener.l7policies) + 1: with session.begin(subtransactions=True): diff --git a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py index 37f7f59701..c08dc97025 100644 --- a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py @@ -28,6 +28,8 @@ AUTH_VERSION = '2' class TestAmphoraFlows(base.TestCase): def setUp(self): + super(TestAmphoraFlows, self).setUp() + old_amp_driver = cfg.CONF.controller_worker.amphora_driver cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', group='controller_worker') cfg.CONF.set_override('enable_anti_affinity', False, @@ -36,7 +38,8 @@ class TestAmphoraFlows(base.TestCase): conf = oslo_fixture.Config(cfg.CONF) conf.config(group="keystone_authtoken", auth_version=AUTH_VERSION) - super(TestAmphoraFlows, self).setUp() + self.addCleanup(cfg.CONF.set_override, 'amphora_driver', + old_amp_driver, group='controller_worker') def test_get_create_amphora_flow(self): @@ -53,8 +56,6 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(0, len(amp_flow.requires)) def test_get_create_amphora_flow_cert(self): - cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', - group='controller_worker') self.AmpFlow = amphora_flows.AmphoraFlows() amp_flow = self.AmpFlow.get_create_amphora_flow() @@ -69,8 +70,6 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(0, len(amp_flow.requires)) def test_get_create_amphora_for_lb_flow(self): - cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', - group='controller_worker') amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( 'SOMEPREFIX', constants.ROLE_STANDALONE) @@ -155,8 +154,6 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(2, len(amp_flow.requires)) def test_get_cert_backup_create_amphora_for_lb_flow(self): - cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', - group='controller_worker') self.AmpFlow = amphora_flows.AmphoraFlows() amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( @@ -176,8 +173,6 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(1, len(amp_flow.requires)) def test_get_cert_bogus_create_amphora_for_lb_flow(self): - cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', - group='controller_worker') self.AmpFlow = amphora_flows.AmphoraFlows() amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( @@ -337,8 +332,6 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(12, len(amp_flow.provides)) def test_cert_rotate_amphora_flow(self): - cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', - group='controller_worker') self.AmpFlow = amphora_flows.AmphoraFlows() amp_rotate_flow = self.AmpFlow.cert_rotate_amphora_flow() diff --git a/octavia/tests/unit/controller/worker/flows/test_listener_flows.py b/octavia/tests/unit/controller/worker/flows/test_listener_flows.py index ead6b41022..408e6f6590 100644 --- a/octavia/tests/unit/controller/worker/flows/test_listener_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_listener_flows.py @@ -76,3 +76,12 @@ class TestListenerFlows(base.TestCase): self.assertEqual(4, len(listener_flow.requires)) self.assertEqual(0, len(listener_flow.provides)) + + def test_get_create_all_listeners_flow(self): + listeners_flow = self.ListenerFlow.get_create_all_listeners_flow() + self.assertIsInstance(listeners_flow, flow.Flow) + self.assertIn(constants.LOADBALANCER, listeners_flow.requires) + self.assertIn(constants.LOADBALANCER_ID, listeners_flow.requires) + self.assertIn(constants.LOADBALANCER, listeners_flow.provides) + self.assertEqual(2, len(listeners_flow.requires)) + self.assertEqual(2, len(listeners_flow.provides)) diff --git a/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py index ea6e891442..3d60e79c51 100644 --- a/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_load_balancer_flows.py @@ -27,10 +27,15 @@ import octavia.tests.unit.base as base class TestLoadBalancerFlows(base.TestCase): def setUp(self): + super(TestLoadBalancerFlows, self).setUp() + old_amp_driver = cfg.CONF.controller_worker.amphora_driver + cfg.CONF.set_override('amphora_driver', 'amphora_haproxy_rest_driver', + group='controller_worker') self.LBFlow = load_balancer_flows.LoadBalancerFlows() conf = oslo_fixture.Config(cfg.CONF) conf.config(group="nova", enable_anti_affinity=False) - super(TestLoadBalancerFlows, self).setUp() + self.addCleanup(cfg.CONF.set_override, 'amphora_driver', + old_amp_driver, group='controller_worker') def test_get_create_load_balancer_flow(self): amp_flow = self.LBFlow.get_create_load_balancer_flow( @@ -164,3 +169,81 @@ class TestLoadBalancerFlows(base.TestCase): self.assertEqual(4, len(amp_flow.provides)) self.assertEqual(2, len(amp_flow.requires)) + + # Test mark_active=False + amp_flow = self.LBFlow.get_post_lb_amp_association_flow( + '123', constants.TOPOLOGY_ACTIVE_STANDBY) + + self.assertIsInstance(amp_flow, flow.Flow) + + self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires) + self.assertIn(constants.UPDATE_DICT, amp_flow.requires) + self.assertIn(constants.LOADBALANCER, amp_flow.provides) + + self.assertEqual(4, len(amp_flow.provides)) + self.assertEqual(2, len(amp_flow.requires)) + + def test_get_create_load_balancer_graph_flows(self): + allocate_amp_flow, post_amp_flow = ( + self.LBFlow.get_create_load_balancer_graph_flows( + constants.TOPOLOGY_SINGLE, '123' + ) + ) + self.assertIsInstance(allocate_amp_flow, flow.Flow) + self.assertIn(constants.LOADBALANCER_ID, allocate_amp_flow.requires) + + self.assertIn(constants.AMPHORA, allocate_amp_flow.provides) + self.assertIn(constants.AMPHORA_ID, allocate_amp_flow.provides) + self.assertIn(constants.COMPUTE_ID, allocate_amp_flow.provides) + self.assertIn(constants.COMPUTE_OBJ, allocate_amp_flow.provides) + + self.assertEqual(1, len(allocate_amp_flow.requires)) + self.assertEqual(5, len(allocate_amp_flow.provides), + allocate_amp_flow.provides) + + self.assertIsInstance(post_amp_flow, flow.Flow) + self.assertIn(constants.LOADBALANCER_ID, post_amp_flow.requires) + self.assertIn(constants.UPDATE_DICT, post_amp_flow.requires) + + self.assertIn(constants.LOADBALANCER, post_amp_flow.provides) + self.assertIn(constants.DELTAS, post_amp_flow.provides) + self.assertIn(constants.ADDED_PORTS, post_amp_flow.provides) + self.assertIn(constants.VIP, post_amp_flow.provides) + self.assertIn(constants.AMPS_DATA, post_amp_flow.provides) + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, + post_amp_flow.provides) + + self.assertEqual(2, len(post_amp_flow.requires)) + self.assertEqual(7, len(post_amp_flow.provides)) + + # Test Active/Standby + allocate_amp_flow, post_amp_flow = ( + self.LBFlow.get_create_load_balancer_graph_flows( + constants.TOPOLOGY_ACTIVE_STANDBY, '123' + ) + ) + self.assertIsInstance(allocate_amp_flow, flow.Flow) + self.assertIn(constants.LOADBALANCER_ID, allocate_amp_flow.requires) + + self.assertIn(constants.AMPHORA, allocate_amp_flow.provides) + self.assertIn(constants.AMPHORA_ID, allocate_amp_flow.provides) + self.assertIn(constants.COMPUTE_ID, allocate_amp_flow.provides) + self.assertIn(constants.COMPUTE_OBJ, allocate_amp_flow.provides) + + self.assertEqual(1, len(allocate_amp_flow.requires)) + self.assertEqual(5, len(allocate_amp_flow.provides)) + + self.assertIsInstance(post_amp_flow, flow.Flow) + self.assertIn(constants.LOADBALANCER_ID, post_amp_flow.requires) + self.assertIn(constants.UPDATE_DICT, post_amp_flow.requires) + + self.assertIn(constants.LOADBALANCER, post_amp_flow.provides) + self.assertIn(constants.DELTAS, post_amp_flow.provides) + self.assertIn(constants.ADDED_PORTS, post_amp_flow.provides) + self.assertIn(constants.VIP, post_amp_flow.provides) + self.assertIn(constants.AMPS_DATA, post_amp_flow.provides) + self.assertIn(constants.AMPHORAE_NETWORK_CONFIG, + post_amp_flow.provides) + + self.assertEqual(2, len(post_amp_flow.requires)) + self.assertEqual(7, len(post_amp_flow.provides)) diff --git a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py index c2a454d017..99db383a46 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py @@ -18,6 +18,7 @@ from oslo_utils import uuidutils from taskflow.types import failure from octavia.common import constants +from octavia.common import data_models from octavia.controller.worker.tasks import amphora_driver_tasks from octavia.db import repositories as repo import octavia.tests.unit.base as base @@ -31,10 +32,11 @@ LB_ID = uuidutils.generate_uuid() _amphora_mock = mock.MagicMock() _amphora_mock.id = AMP_ID _amphora_mock.status = constants.AMPHORA_ALLOCATED -_listener_mock = mock.MagicMock() -_listener_mock.id = LISTENER_ID _load_balancer_mock = mock.MagicMock() _load_balancer_mock.id = LB_ID +_listener_mock = mock.MagicMock() +_listener_mock.id = LISTENER_ID +_load_balancer_mock.listeners = [_listener_mock] _vip_mock = mock.MagicMock() _load_balancer_mock.vip = _vip_mock _LB_mock = mock.MagicMock() @@ -76,13 +78,45 @@ class TestAmphoraDriverTasks(base.TestCase): mock_driver.update.assert_called_once_with(_listener_mock, _vip_mock) # Test the revert - amp = listener_update_obj.revert([_listener_mock]) + amp = listener_update_obj.revert(_load_balancer_mock) repo.ListenerRepository.update.assert_called_once_with( _session_mock, id=LISTENER_ID, provisioning_status=constants.ERROR) self.assertIsNone(amp) + def test_listeners_update(self, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_update): + listeners_update_obj = amphora_driver_tasks.ListenersUpdate() + listeners = [data_models.Listener(id='listener1'), + data_models.Listener(id='listener2')] + vip = data_models.Vip(ip_address='10.0.0.1') + lb = data_models.LoadBalancer(id='lb1', listeners=listeners, vip=vip) + listeners_update_obj.execute(lb, listeners) + mock_driver.update.assert_has_calls([mock.call(listeners[0], vip), + mock.call(listeners[1], vip)]) + self.assertEqual(2, mock_driver.update.call_count) + self.assertIsNotNone(listeners[0].load_balancer) + self.assertIsNotNone(listeners[1].load_balancer) + + # Test the revert + amp = listeners_update_obj.revert(lb) + expected_db_calls = [mock.call(_session_mock, + id=listeners[0].id, + provisioning_status=constants.ERROR), + mock.call(_session_mock, + id=listeners[1].id, + provisioning_status=constants.ERROR)] + repo.ListenerRepository.update.has_calls(expected_db_calls) + self.assertEqual(2, repo.ListenerRepository.update.call_count) + self.assertIsNone(amp) + def test_listener_stop(self, mock_driver, mock_generate_uuid, diff --git a/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py index a2258697f0..86b2ed9c78 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py @@ -22,6 +22,7 @@ from sqlalchemy.orm import exc from taskflow.types import failure from octavia.common import constants +from octavia.common import data_models from octavia.controller.worker.tasks import database_tasks from octavia.db import repositories as repo import octavia.tests.unit.base as base @@ -932,6 +933,7 @@ class TestDatabaseTasks(base.TestCase): 'TEST', LB_ID, provisioning_status=constants.ACTIVE) + self.assertEqual(0, repo.ListenerRepository.update.call_count) # Test the revert @@ -942,6 +944,47 @@ class TestDatabaseTasks(base.TestCase): 'TEST', LB_ID, provisioning_status=constants.ERROR) + self.assertEqual(0, repo.ListenerRepository.update.call_count) + + def test_mark_LB_active_in_db_and_listeners(self, + mock_generate_uuid, + mock_LOG, + mock_get_session, + mock_loadbalancer_repo_update, + mock_listener_repo_update, + mock_amphora_repo_update, + mock_amphora_repo_delete): + listeners = [data_models.Listener(id='listener1'), + data_models.Listener(id='listener2')] + lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners) + mark_lb_active = database_tasks.MarkLBActiveInDB(mark_listeners=True) + mark_lb_active.execute(lb) + + repo.LoadBalancerRepository.update.assert_called_once_with( + 'TEST', + lb.id, + provisioning_status=constants.ACTIVE) + self.assertEqual(2, repo.ListenerRepository.update.call_count) + repo.ListenerRepository.update.has_calls( + [mock.call('TEST', listeners[0].id, + provisioning_status=constants.ACTIVE), + mock.call('TEST', listeners[1].id, + provisioning_status=constants.ACTIVE)]) + + mock_loadbalancer_repo_update.reset_mock() + mock_listener_repo_update.reset_mock() + mark_lb_active.revert(lb) + + repo.LoadBalancerRepository.update.assert_called_once_with( + 'TEST', + lb.id, + provisioning_status=constants.ERROR) + self.assertEqual(2, repo.ListenerRepository.update.call_count) + repo.ListenerRepository.update.has_calls( + [mock.call('TEST', listeners[0].id, + provisioning_status=constants.ERROR), + mock.call('TEST', listeners[1].id, + provisioning_status=constants.ERROR)]) def test_mark_LB_deleted_in_db(self, mock_generate_uuid, @@ -1405,4 +1448,4 @@ class TestDatabaseTasks(base.TestCase): # Test the revert mock_listener_repo_update.reset_mock() - update_server_group_info.revert(LB_ID, SERVER_GROUP_ID) + update_server_group_info.revert(LB_ID, SERVER_GROUP_ID) \ No newline at end of file diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index 110c30d5ba..2de93aa548 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -19,6 +19,7 @@ from oslo_utils import uuidutils from octavia.common import base_taskflow from octavia.common import constants +from octavia.common import data_models from octavia.controller.worker import controller_worker import octavia.tests.unit.base as base @@ -403,6 +404,7 @@ class TestControllerWorker(base.TestCase): mock_get_get_post_lb_amp_association_flow.return_value = _post_flow store = {constants.LOADBALANCER_ID: LB_ID, 'update_dict': {'topology': 'SINGLE'}} + setattr(mock_lb_repo_get.return_value, 'listeners', []) cw = controller_worker.ControllerWorker() cw.create_load_balancer(LB_ID) @@ -440,6 +442,91 @@ class TestControllerWorker(base.TestCase): mock_eng.run.assert_any_call() mock_eng_post.run.assert_any_call() + @mock.patch('octavia.controller.worker.flows.load_balancer_flows.' + 'LoadBalancerFlows.get_post_lb_amp_association_flow') + @mock.patch('octavia.controller.worker.flows.load_balancer_flows.' + 'LoadBalancerFlows.get_create_load_balancer_flow') + @mock.patch('octavia.controller.worker.flows.load_balancer_flows.' + 'LoadBalancerFlows.get_create_load_balancer_graph_flows') + def test_create_load_balancer_full_graph( + self, + mock_get_create_load_balancer_graph_flows, + mock_get_create_load_balancer_flow, + mock_get_post_lb_amp_association_flow, + mock_api_get_session, + mock_dyn_log_listener, + mock_taskflow_load, + mock_pool_repo_get, + mock_member_repo_get, + mock_l7rule_repo_get, + mock_l7policy_repo_get, + mock_listener_repo_get, + mock_lb_repo_get, + mock_health_mon_repo_get, + mock_amp_repo_get): + CONF.set_override(group='controller_worker', + name='loadbalancer_topology', + override=constants.TOPOLOGY_SINGLE, + enforce_type=True) + listeners = [data_models.Listener(id='listener1'), + data_models.Listener(id='listener2')] + lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners) + mock_lb_repo_get.return_value = lb + mock_eng = mock.Mock() + mock_eng_post = mock.Mock() + mock_taskflow_load.side_effect = [mock_eng, mock_eng_post] + _post_flow = mock.MagicMock() + mock_get_create_load_balancer_graph_flows.return_value = ( + _flow_mock, _post_flow + ) + store = {constants.LOADBALANCER_ID: LB_ID, + 'update_dict': {'topology': 'SINGLE'}} + + cw = controller_worker.ControllerWorker() + cw.create_load_balancer(LB_ID) + + calls = [mock.call(_flow_mock, store=store), + mock.call(_post_flow, store=store)] + mock_taskflow_load.assert_has_calls(calls, any_order=True) + mock_eng.run.assert_any_call() + mock_eng_post.run.assert_any_call() + mock_get_create_load_balancer_graph_flows.assert_called_once_with( + 'SINGLE', 'post-amphora-association' + ) + self.assertFalse(mock_get_create_load_balancer_flow.called) + self.assertFalse(mock_get_post_lb_amp_association_flow.called) + + # Test code path for active standby full lb graph creation + CONF.set_override(group='controller_worker', + name='loadbalancer_topology', + override=constants.TOPOLOGY_ACTIVE_STANDBY) + _flow_mock.reset_mock() + mock_get_create_load_balancer_graph_flows.reset_mock() + mock_taskflow_load.reset_mock() + mock_eng = mock.Mock() + mock_eng_post = mock.Mock() + mock_taskflow_load.side_effect = [mock_eng, mock_eng_post] + _post_flow = mock.MagicMock() + mock_get_create_load_balancer_graph_flows.return_value = ( + _flow_mock, _post_flow + ) + store = {constants.LOADBALANCER_ID: LB_ID, + 'update_dict': {'topology': 'ACTIVE_STANDBY'}} + + cw = controller_worker.ControllerWorker() + cw.create_load_balancer(LB_ID) + + calls = [mock.call(_flow_mock, store=store), + mock.call(_post_flow, store=store)] + mock_taskflow_load.assert_has_calls(calls, any_order=True) + mock_eng.run.assert_any_call() + mock_eng_post.run.assert_any_call() + mock_get_create_load_balancer_graph_flows.assert_called_once_with( + 'ACTIVE_STANDBY', 'post-amphora-association' + ) + self.assertFalse(mock_get_create_load_balancer_flow.called) + self.assertFalse(mock_get_post_lb_amp_association_flow.called) + @mock.patch('octavia.controller.worker.flows.load_balancer_flows.' 'LoadBalancerFlows.get_delete_load_balancer_flow', return_value=(_flow_mock, {'test': 'test'}))