From fc4c634d7c828ed55e4ea1b70049e867fdc27b8a Mon Sep 17 00:00:00 2001 From: Michael Johnson Date: Mon, 9 Dec 2019 16:58:16 -0800 Subject: [PATCH] Update ListenersUpdate for lb/listener dicts This patch updates the ListersUpdate task to accept a load balancer ID instead of a load balancer database object as part of the job board work. It also removes a duplicate test that is no longer needed. Change-Id: I819d44a37cba3ef952d109a675ae6d55e3eb4aa1 --- .../controller/worker/v2/controller_worker.py | 2 +- .../worker/v2/flows/health_monitor_flows.py | 6 +-- .../worker/v2/flows/l7policy_flows.py | 6 +-- .../worker/v2/flows/l7rule_flows.py | 6 +-- .../worker/v2/flows/listener_flows.py | 6 +-- .../worker/v2/flows/load_balancer_flows.py | 2 +- .../worker/v2/flows/member_flows.py | 8 ++-- .../controller/worker/v2/flows/pool_flows.py | 6 +-- .../worker/v2/tasks/amphora_driver_tasks.py | 15 +++++-- .../worker/v2/flows/test_listener_flows.py | 6 +-- .../v2/flows/test_load_balancer_flows.py | 2 +- .../v2/tasks/test_amphora_driver_tasks.py | 42 ++----------------- .../worker/v2/test_controller_worker.py | 4 +- 13 files changed, 41 insertions(+), 70 deletions(-) diff --git a/octavia/controller/worker/v2/controller_worker.py b/octavia/controller/worker/v2/controller_worker.py index fc0061a42a..42f5c7d31b 100644 --- a/octavia/controller/worker/v2/controller_worker.py +++ b/octavia/controller/worker/v2/controller_worker.py @@ -296,7 +296,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): store={constants.LISTENER: listener, constants.UPDATE_DICT: listener_updates, constants.LOADBALANCER_ID: db_lb.id, - constants.LOADBALANCER: db_lb, constants.LISTENERS: [listener]}) with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG): update_listener_tf.run() @@ -389,6 +388,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): update_lb_tf = self._taskflow_load( self._lb_flows.get_update_load_balancer_flow(), store={constants.LOADBALANCER: lb, + constants.LOADBALANCER_ID: lb.id, constants.UPDATE_DICT: load_balancer_updates}) with tf_logging.DynamicLoggingListener(update_lb_tf, diff --git a/octavia/controller/worker/v2/flows/health_monitor_flows.py b/octavia/controller/worker/v2/flows/health_monitor_flows.py index fc6759f755..cbc7281f4f 100644 --- a/octavia/controller/worker/v2/flows/health_monitor_flows.py +++ b/octavia/controller/worker/v2/flows/health_monitor_flows.py @@ -36,7 +36,7 @@ class HealthMonitorFlows(object): create_hm_flow.add(database_tasks.MarkHealthMonitorPendingCreateInDB( requires=constants.HEALTH_MON)) create_hm_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) create_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB( requires=constants.HEALTH_MON)) create_hm_flow.add(database_tasks.MarkPoolActiveInDB( @@ -59,7 +59,7 @@ class HealthMonitorFlows(object): delete_hm_flow.add(database_tasks.MarkHealthMonitorPendingDeleteInDB( requires=constants.HEALTH_MON)) delete_hm_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) delete_hm_flow.add(database_tasks.DeleteHealthMonitorInDB( requires=constants.HEALTH_MON)) delete_hm_flow.add(database_tasks.DecrementHealthMonitorQuota( @@ -88,7 +88,7 @@ class HealthMonitorFlows(object): update_hm_flow.add(database_tasks.MarkHealthMonitorPendingUpdateInDB( requires=constants.HEALTH_MON)) update_hm_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) update_hm_flow.add(database_tasks.UpdateHealthMonInDB( requires=[constants.HEALTH_MON, constants.UPDATE_DICT])) update_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB( diff --git a/octavia/controller/worker/v2/flows/l7policy_flows.py b/octavia/controller/worker/v2/flows/l7policy_flows.py index 4de89836c6..22b2642030 100644 --- a/octavia/controller/worker/v2/flows/l7policy_flows.py +++ b/octavia/controller/worker/v2/flows/l7policy_flows.py @@ -36,7 +36,7 @@ class L7PolicyFlows(object): create_l7policy_flow.add(database_tasks.MarkL7PolicyPendingCreateInDB( requires=constants.L7POLICY)) create_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) create_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB( requires=constants.L7POLICY)) create_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB( @@ -57,7 +57,7 @@ class L7PolicyFlows(object): delete_l7policy_flow.add(database_tasks.MarkL7PolicyPendingDeleteInDB( requires=constants.L7POLICY)) delete_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) delete_l7policy_flow.add(database_tasks.DeleteL7PolicyInDB( requires=constants.L7POLICY)) delete_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB( @@ -78,7 +78,7 @@ class L7PolicyFlows(object): update_l7policy_flow.add(database_tasks.MarkL7PolicyPendingUpdateInDB( requires=constants.L7POLICY)) update_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) update_l7policy_flow.add(database_tasks.UpdateL7PolicyInDB( requires=[constants.L7POLICY, constants.UPDATE_DICT])) update_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB( diff --git a/octavia/controller/worker/v2/flows/l7rule_flows.py b/octavia/controller/worker/v2/flows/l7rule_flows.py index 33687c37d7..57e9200fb6 100644 --- a/octavia/controller/worker/v2/flows/l7rule_flows.py +++ b/octavia/controller/worker/v2/flows/l7rule_flows.py @@ -36,7 +36,7 @@ class L7RuleFlows(object): create_l7rule_flow.add(database_tasks.MarkL7RulePendingCreateInDB( requires=constants.L7RULE)) create_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) create_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB( requires=constants.L7RULE)) create_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB( @@ -59,7 +59,7 @@ class L7RuleFlows(object): delete_l7rule_flow.add(database_tasks.MarkL7RulePendingDeleteInDB( requires=constants.L7RULE)) delete_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) delete_l7rule_flow.add(database_tasks.DeleteL7RuleInDB( requires=constants.L7RULE)) delete_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB( @@ -82,7 +82,7 @@ class L7RuleFlows(object): update_l7rule_flow.add(database_tasks.MarkL7RulePendingUpdateInDB( requires=constants.L7RULE)) update_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) update_l7rule_flow.add(database_tasks.UpdateL7RuleInDB( requires=[constants.L7RULE, constants.UPDATE_DICT])) update_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB( diff --git a/octavia/controller/worker/v2/flows/listener_flows.py b/octavia/controller/worker/v2/flows/listener_flows.py index 5d26dc1195..f8ae183102 100644 --- a/octavia/controller/worker/v2/flows/listener_flows.py +++ b/octavia/controller/worker/v2/flows/listener_flows.py @@ -33,7 +33,7 @@ class ListenerFlows(object): create_listener_flow.add(lifecycle_tasks.ListenersToErrorOnRevertTask( requires=constants.LISTENERS)) create_listener_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) create_listener_flow.add(network_tasks.UpdateVIP( requires=constants.LISTENERS)) create_listener_flow.add(database_tasks. @@ -57,7 +57,7 @@ class ListenerFlows(object): requires=constants.LOADBALANCER_ID, provides=constants.LOADBALANCER)) create_all_listeners_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) create_all_listeners_flow.add(network_tasks.UpdateVIP( requires=constants.LISTENERS)) return create_all_listeners_flow @@ -114,7 +114,7 @@ class ListenerFlows(object): update_listener_flow.add(lifecycle_tasks.ListenerToErrorOnRevertTask( requires=constants.LISTENER)) update_listener_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) update_listener_flow.add(network_tasks.UpdateVIP( requires=constants.LISTENERS)) update_listener_flow.add(database_tasks.UpdateListenerInDB( diff --git a/octavia/controller/worker/v2/flows/load_balancer_flows.py b/octavia/controller/worker/v2/flows/load_balancer_flows.py index f7231737b5..04e28f8e39 100644 --- a/octavia/controller/worker/v2/flows/load_balancer_flows.py +++ b/octavia/controller/worker/v2/flows/load_balancer_flows.py @@ -338,7 +338,7 @@ class LoadBalancerFlows(object): update_LB_flow.add(network_tasks.ApplyQos( requires=(constants.LOADBALANCER, constants.UPDATE_DICT))) update_LB_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) update_LB_flow.add(database_tasks.UpdateLoadbalancerInDB( requires=[constants.LOADBALANCER, constants.UPDATE_DICT])) update_LB_flow.add(database_tasks.MarkLBActiveInDB( diff --git a/octavia/controller/worker/v2/flows/member_flows.py b/octavia/controller/worker/v2/flows/member_flows.py index 6d12991539..b97c4e5c1d 100644 --- a/octavia/controller/worker/v2/flows/member_flows.py +++ b/octavia/controller/worker/v2/flows/member_flows.py @@ -47,7 +47,7 @@ class MemberFlows(object): requires=(constants.LOADBALANCER, constants.ADDED_PORTS) )) create_member_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) create_member_flow.add(database_tasks.MarkMemberActiveInDB( requires=constants.MEMBER)) create_member_flow.add(database_tasks.MarkPoolActiveInDB( @@ -73,7 +73,7 @@ class MemberFlows(object): delete_member_flow.add(database_tasks.MarkMemberPendingDeleteInDB( requires=constants.MEMBER)) delete_member_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) delete_member_flow.add(database_tasks.DeleteMemberInDB( requires=constants.MEMBER)) delete_member_flow.add(database_tasks.DecrementMemberQuota( @@ -101,7 +101,7 @@ class MemberFlows(object): update_member_flow.add(database_tasks.MarkMemberPendingUpdateInDB( requires=constants.MEMBER)) update_member_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) update_member_flow.add(database_tasks.UpdateMemberInDB( requires=[constants.MEMBER, constants.UPDATE_DICT])) update_member_flow.add(database_tasks.MarkMemberActiveInDB( @@ -190,7 +190,7 @@ class MemberFlows(object): # Update the Listener (this makes the changes active on the Amp) batch_update_members_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) # Mark all the members ACTIVE here, then pool then LB/Listeners batch_update_members_flow.add(unordered_members_active_flow) diff --git a/octavia/controller/worker/v2/flows/pool_flows.py b/octavia/controller/worker/v2/flows/pool_flows.py index a26928c231..b055cba74b 100644 --- a/octavia/controller/worker/v2/flows/pool_flows.py +++ b/octavia/controller/worker/v2/flows/pool_flows.py @@ -36,7 +36,7 @@ class PoolFlows(object): create_pool_flow.add(database_tasks.MarkPoolPendingCreateInDB( requires=constants.POOL_ID)) create_pool_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) create_pool_flow.add(database_tasks.MarkPoolActiveInDB( requires=constants.POOL_ID)) create_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB( @@ -59,7 +59,7 @@ class PoolFlows(object): delete_pool_flow.add(database_tasks.CountPoolChildrenForQuota( requires=constants.POOL_ID, provides=constants.POOL_CHILD_COUNT)) delete_pool_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) delete_pool_flow.add(database_tasks.DeletePoolInDB( requires=constants.POOL_ID)) delete_pool_flow.add(database_tasks.DecrementPoolQuota( @@ -109,7 +109,7 @@ class PoolFlows(object): update_pool_flow.add(database_tasks.MarkPoolPendingUpdateInDB( requires=constants.POOL_ID)) update_pool_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=constants.LOADBALANCER)) + requires=constants.LOADBALANCER_ID)) update_pool_flow.add(database_tasks.UpdatePoolInDB( requires=[constants.POOL_ID, constants.UPDATE_DICT])) update_pool_flow.add(database_tasks.MarkPoolActiveInDB( diff --git a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py index 8174b10b8d..7cf7138d7a 100644 --- a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py @@ -92,15 +92,22 @@ class AmpListenersUpdate(BaseAmphoraTask): class ListenersUpdate(BaseAmphoraTask): """Task to update amphora with all specified listeners' configurations.""" - def execute(self, loadbalancer): + def execute(self, loadbalancer_id): """Execute updates per listener for an amphora.""" - self.amphora_driver.update(loadbalancer) + loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), + id=loadbalancer_id) + if loadbalancer: + self.amphora_driver.update(loadbalancer) + else: + LOG.error('Load balancer %s for listeners update not found. ' + 'Skipping update.', loadbalancer_id) - def revert(self, loadbalancer, *args, **kwargs): + def revert(self, loadbalancer_id, *args, **kwargs): """Handle failed listeners updates.""" LOG.warning("Reverting listeners updates.") - + loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), + id=loadbalancer_id) for listener in loadbalancer.listeners: self.task_utils.mark_listener_prov_status_error( listener.id) diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_listener_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_listener_flows.py index fe0c7294e5..e497ecffcd 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_listener_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_listener_flows.py @@ -38,9 +38,8 @@ class TestListenerFlows(base.TestCase): self.assertIn(constants.LISTENERS, listener_flow.requires) self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires) - self.assertIn(constants.LOADBALANCER, listener_flow.requires) - self.assertEqual(3, len(listener_flow.requires)) + self.assertEqual(2, len(listener_flow.requires)) self.assertEqual(0, len(listener_flow.provides)) def test_get_delete_listener_flow(self, mock_get_net_driver): @@ -78,9 +77,8 @@ class TestListenerFlows(base.TestCase): self.assertIn(constants.UPDATE_DICT, listener_flow.requires) self.assertIn(constants.LISTENERS, listener_flow.requires) self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires) - self.assertIn(constants.LOADBALANCER, listener_flow.requires) - self.assertEqual(5, len(listener_flow.requires)) + self.assertEqual(4, len(listener_flow.requires)) self.assertEqual(0, len(listener_flow.provides)) def test_get_create_all_listeners_flow(self, mock_get_net_driver): diff --git a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py index 12c90bdb77..ed5f4f6e2b 100644 --- a/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py +++ b/octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py @@ -137,7 +137,7 @@ class TestLoadBalancerFlows(base.TestCase): self.assertIn(constants.UPDATE_DICT, lb_flow.requires) self.assertEqual(0, len(lb_flow.provides)) - self.assertEqual(2, len(lb_flow.requires)) + self.assertEqual(3, len(lb_flow.requires)) def test_get_post_lb_amp_association_flow(self, mock_get_net_driver): amp_flow = self.LBFlow.get_post_lb_amp_association_flow( diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py index 4d3a5a9464..eed0abee77 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py @@ -107,40 +107,6 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) - @mock.patch('octavia.db.repositories.LoadBalancerRepository.get') - def test_listener_update(self, - mock_lb_get, - mock_driver, - mock_generate_uuid, - mock_log, - mock_get_session, - mock_listener_repo_get, - mock_listener_repo_update, - mock_amphora_repo_update): - - listener_update_obj = amphora_driver_tasks.ListenersUpdate() - listener_update_obj.execute(_load_balancer_mock) - - mock_driver.update.assert_called_once_with(_load_balancer_mock) - - # Test the revert - amp = listener_update_obj.revert(_load_balancer_mock) - repo.ListenerRepository.update.assert_called_once_with( - _session_mock, - id=LISTENER_ID, - provisioning_status=constants.ERROR) - self.assertIsNone(amp) - - # Test the revert with exception - repo.ListenerRepository.update.reset_mock() - mock_listener_repo_update.side_effect = Exception('fail') - amp = listener_update_obj.revert(_load_balancer_mock) - repo.ListenerRepository.update.assert_called_once_with( - _session_mock, - id=LISTENER_ID, - provisioning_status=constants.ERROR) - self.assertIsNone(amp) - @mock.patch('octavia.db.repositories.LoadBalancerRepository.get') def test_listeners_update(self, mock_lb_get, @@ -157,15 +123,15 @@ class TestAmphoraDriverTasks(base.TestCase): data_models.Listener(id='listener2')] vip = data_models.Vip(ip_address='10.0.0.1') lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners, vip=vip) - mock_lb_get.return_value = lb - listeners_update_obj.execute(lb) + mock_lb_get.side_effect = [lb, None, lb] + listeners_update_obj.execute(lb.id) mock_driver.update.assert_called_once_with(lb) self.assertEqual(1, mock_driver.update.call_count) - mock_lb_get.reset_mock() + mock_driver.update.reset_mock() listeners_update_obj.execute(None) - mock_lb_get.assert_not_called() + mock_driver.update.assert_not_called() # Test the revert amp = listeners_update_obj.revert(lb) diff --git a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py index 4777d8915d..07779840d8 100644 --- a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py @@ -411,8 +411,6 @@ class TestControllerWorker(base.TestCase): constants.UPDATE_DICT: LISTENER_UPDATE_DICT, constants.LOADBALANCER_ID: LB_ID, - constants.LOADBALANCER: - _load_balancer_mock, constants.LISTENERS: [listener_dict]})) @@ -718,6 +716,8 @@ class TestControllerWorker(base.TestCase): store={constants.UPDATE_DICT: change, constants.LOADBALANCER: _load_balancer_mock, + constants.LOADBALANCER_ID: + _load_balancer_mock.id, })) _flow_mock.run.assert_called_once_with()