Update ListenersUpdate for lb/listener dicts

This patch updates the ListersUpdate task to accept a load balancer
ID instead of a load balancer database object as part of the job board
work.
It also removes a duplicate test that is no longer needed.

Change-Id: I819d44a37cba3ef952d109a675ae6d55e3eb4aa1
This commit is contained in:
Michael Johnson 2019-12-09 16:58:16 -08:00
parent a07de8efb7
commit fc4c634d7c
13 changed files with 41 additions and 70 deletions

View File

@ -296,7 +296,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
store={constants.LISTENER: listener,
constants.UPDATE_DICT: listener_updates,
constants.LOADBALANCER_ID: db_lb.id,
constants.LOADBALANCER: db_lb,
constants.LISTENERS: [listener]})
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run()
@ -389,6 +388,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
update_lb_tf = self._taskflow_load(
self._lb_flows.get_update_load_balancer_flow(),
store={constants.LOADBALANCER: lb,
constants.LOADBALANCER_ID: lb.id,
constants.UPDATE_DICT: load_balancer_updates})
with tf_logging.DynamicLoggingListener(update_lb_tf,

View File

@ -36,7 +36,7 @@ class HealthMonitorFlows(object):
create_hm_flow.add(database_tasks.MarkHealthMonitorPendingCreateInDB(
requires=constants.HEALTH_MON))
create_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
create_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB(
requires=constants.HEALTH_MON))
create_hm_flow.add(database_tasks.MarkPoolActiveInDB(
@ -59,7 +59,7 @@ class HealthMonitorFlows(object):
delete_hm_flow.add(database_tasks.MarkHealthMonitorPendingDeleteInDB(
requires=constants.HEALTH_MON))
delete_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
delete_hm_flow.add(database_tasks.DeleteHealthMonitorInDB(
requires=constants.HEALTH_MON))
delete_hm_flow.add(database_tasks.DecrementHealthMonitorQuota(
@ -88,7 +88,7 @@ class HealthMonitorFlows(object):
update_hm_flow.add(database_tasks.MarkHealthMonitorPendingUpdateInDB(
requires=constants.HEALTH_MON))
update_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
update_hm_flow.add(database_tasks.UpdateHealthMonInDB(
requires=[constants.HEALTH_MON, constants.UPDATE_DICT]))
update_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB(

View File

@ -36,7 +36,7 @@ class L7PolicyFlows(object):
create_l7policy_flow.add(database_tasks.MarkL7PolicyPendingCreateInDB(
requires=constants.L7POLICY))
create_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
create_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
create_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
@ -57,7 +57,7 @@ class L7PolicyFlows(object):
delete_l7policy_flow.add(database_tasks.MarkL7PolicyPendingDeleteInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
delete_l7policy_flow.add(database_tasks.DeleteL7PolicyInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
@ -78,7 +78,7 @@ class L7PolicyFlows(object):
update_l7policy_flow.add(database_tasks.MarkL7PolicyPendingUpdateInDB(
requires=constants.L7POLICY))
update_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
update_l7policy_flow.add(database_tasks.UpdateL7PolicyInDB(
requires=[constants.L7POLICY, constants.UPDATE_DICT]))
update_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(

View File

@ -36,7 +36,7 @@ class L7RuleFlows(object):
create_l7rule_flow.add(database_tasks.MarkL7RulePendingCreateInDB(
requires=constants.L7RULE))
create_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
create_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB(
requires=constants.L7RULE))
create_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB(
@ -59,7 +59,7 @@ class L7RuleFlows(object):
delete_l7rule_flow.add(database_tasks.MarkL7RulePendingDeleteInDB(
requires=constants.L7RULE))
delete_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
delete_l7rule_flow.add(database_tasks.DeleteL7RuleInDB(
requires=constants.L7RULE))
delete_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB(
@ -82,7 +82,7 @@ class L7RuleFlows(object):
update_l7rule_flow.add(database_tasks.MarkL7RulePendingUpdateInDB(
requires=constants.L7RULE))
update_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
update_l7rule_flow.add(database_tasks.UpdateL7RuleInDB(
requires=[constants.L7RULE, constants.UPDATE_DICT]))
update_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB(

View File

@ -33,7 +33,7 @@ class ListenerFlows(object):
create_listener_flow.add(lifecycle_tasks.ListenersToErrorOnRevertTask(
requires=constants.LISTENERS))
create_listener_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
create_listener_flow.add(network_tasks.UpdateVIP(
requires=constants.LISTENERS))
create_listener_flow.add(database_tasks.
@ -57,7 +57,7 @@ class ListenerFlows(object):
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
create_all_listeners_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
create_all_listeners_flow.add(network_tasks.UpdateVIP(
requires=constants.LISTENERS))
return create_all_listeners_flow
@ -114,7 +114,7 @@ class ListenerFlows(object):
update_listener_flow.add(lifecycle_tasks.ListenerToErrorOnRevertTask(
requires=constants.LISTENER))
update_listener_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
update_listener_flow.add(network_tasks.UpdateVIP(
requires=constants.LISTENERS))
update_listener_flow.add(database_tasks.UpdateListenerInDB(

View File

@ -338,7 +338,7 @@ class LoadBalancerFlows(object):
update_LB_flow.add(network_tasks.ApplyQos(
requires=(constants.LOADBALANCER, constants.UPDATE_DICT)))
update_LB_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
update_LB_flow.add(database_tasks.UpdateLoadbalancerInDB(
requires=[constants.LOADBALANCER, constants.UPDATE_DICT]))
update_LB_flow.add(database_tasks.MarkLBActiveInDB(

View File

@ -47,7 +47,7 @@ class MemberFlows(object):
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)
))
create_member_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
create_member_flow.add(database_tasks.MarkMemberActiveInDB(
requires=constants.MEMBER))
create_member_flow.add(database_tasks.MarkPoolActiveInDB(
@ -73,7 +73,7 @@ class MemberFlows(object):
delete_member_flow.add(database_tasks.MarkMemberPendingDeleteInDB(
requires=constants.MEMBER))
delete_member_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
delete_member_flow.add(database_tasks.DeleteMemberInDB(
requires=constants.MEMBER))
delete_member_flow.add(database_tasks.DecrementMemberQuota(
@ -101,7 +101,7 @@ class MemberFlows(object):
update_member_flow.add(database_tasks.MarkMemberPendingUpdateInDB(
requires=constants.MEMBER))
update_member_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
update_member_flow.add(database_tasks.UpdateMemberInDB(
requires=[constants.MEMBER, constants.UPDATE_DICT]))
update_member_flow.add(database_tasks.MarkMemberActiveInDB(
@ -190,7 +190,7 @@ class MemberFlows(object):
# Update the Listener (this makes the changes active on the Amp)
batch_update_members_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
# Mark all the members ACTIVE here, then pool then LB/Listeners
batch_update_members_flow.add(unordered_members_active_flow)

View File

@ -36,7 +36,7 @@ class PoolFlows(object):
create_pool_flow.add(database_tasks.MarkPoolPendingCreateInDB(
requires=constants.POOL_ID))
create_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
create_pool_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL_ID))
create_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
@ -59,7 +59,7 @@ class PoolFlows(object):
delete_pool_flow.add(database_tasks.CountPoolChildrenForQuota(
requires=constants.POOL_ID, provides=constants.POOL_CHILD_COUNT))
delete_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
delete_pool_flow.add(database_tasks.DeletePoolInDB(
requires=constants.POOL_ID))
delete_pool_flow.add(database_tasks.DecrementPoolQuota(
@ -109,7 +109,7 @@ class PoolFlows(object):
update_pool_flow.add(database_tasks.MarkPoolPendingUpdateInDB(
requires=constants.POOL_ID))
update_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
update_pool_flow.add(database_tasks.UpdatePoolInDB(
requires=[constants.POOL_ID, constants.UPDATE_DICT]))
update_pool_flow.add(database_tasks.MarkPoolActiveInDB(

View File

@ -92,15 +92,22 @@ class AmpListenersUpdate(BaseAmphoraTask):
class ListenersUpdate(BaseAmphoraTask):
"""Task to update amphora with all specified listeners' configurations."""
def execute(self, loadbalancer):
def execute(self, loadbalancer_id):
"""Execute updates per listener for an amphora."""
self.amphora_driver.update(loadbalancer)
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
id=loadbalancer_id)
if loadbalancer:
self.amphora_driver.update(loadbalancer)
else:
LOG.error('Load balancer %s for listeners update not found. '
'Skipping update.', loadbalancer_id)
def revert(self, loadbalancer, *args, **kwargs):
def revert(self, loadbalancer_id, *args, **kwargs):
"""Handle failed listeners updates."""
LOG.warning("Reverting listeners updates.")
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
id=loadbalancer_id)
for listener in loadbalancer.listeners:
self.task_utils.mark_listener_prov_status_error(
listener.id)

View File

@ -38,9 +38,8 @@ class TestListenerFlows(base.TestCase):
self.assertIn(constants.LISTENERS, listener_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertEqual(3, len(listener_flow.requires))
self.assertEqual(2, len(listener_flow.requires))
self.assertEqual(0, len(listener_flow.provides))
def test_get_delete_listener_flow(self, mock_get_net_driver):
@ -78,9 +77,8 @@ class TestListenerFlows(base.TestCase):
self.assertIn(constants.UPDATE_DICT, listener_flow.requires)
self.assertIn(constants.LISTENERS, listener_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertEqual(5, len(listener_flow.requires))
self.assertEqual(4, len(listener_flow.requires))
self.assertEqual(0, len(listener_flow.provides))
def test_get_create_all_listeners_flow(self, mock_get_net_driver):

View File

@ -137,7 +137,7 @@ class TestLoadBalancerFlows(base.TestCase):
self.assertIn(constants.UPDATE_DICT, lb_flow.requires)
self.assertEqual(0, len(lb_flow.provides))
self.assertEqual(2, len(lb_flow.requires))
self.assertEqual(3, len(lb_flow.requires))
def test_get_post_lb_amp_association_flow(self, mock_get_net_driver):
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(

View File

@ -107,40 +107,6 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_listener_update(self,
mock_lb_get,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
listener_update_obj = amphora_driver_tasks.ListenersUpdate()
listener_update_obj.execute(_load_balancer_mock)
mock_driver.update.assert_called_once_with(_load_balancer_mock)
# Test the revert
amp = listener_update_obj.revert(_load_balancer_mock)
repo.ListenerRepository.update.assert_called_once_with(
_session_mock,
id=LISTENER_ID,
provisioning_status=constants.ERROR)
self.assertIsNone(amp)
# Test the revert with exception
repo.ListenerRepository.update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
amp = listener_update_obj.revert(_load_balancer_mock)
repo.ListenerRepository.update.assert_called_once_with(
_session_mock,
id=LISTENER_ID,
provisioning_status=constants.ERROR)
self.assertIsNone(amp)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_listeners_update(self,
mock_lb_get,
@ -157,15 +123,15 @@ class TestAmphoraDriverTasks(base.TestCase):
data_models.Listener(id='listener2')]
vip = data_models.Vip(ip_address='10.0.0.1')
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners, vip=vip)
mock_lb_get.return_value = lb
listeners_update_obj.execute(lb)
mock_lb_get.side_effect = [lb, None, lb]
listeners_update_obj.execute(lb.id)
mock_driver.update.assert_called_once_with(lb)
self.assertEqual(1, mock_driver.update.call_count)
mock_lb_get.reset_mock()
mock_driver.update.reset_mock()
listeners_update_obj.execute(None)
mock_lb_get.assert_not_called()
mock_driver.update.assert_not_called()
# Test the revert
amp = listeners_update_obj.revert(lb)

View File

@ -411,8 +411,6 @@ class TestControllerWorker(base.TestCase):
constants.UPDATE_DICT:
LISTENER_UPDATE_DICT,
constants.LOADBALANCER_ID: LB_ID,
constants.LOADBALANCER:
_load_balancer_mock,
constants.LISTENERS:
[listener_dict]}))
@ -718,6 +716,8 @@ class TestControllerWorker(base.TestCase):
store={constants.UPDATE_DICT: change,
constants.LOADBALANCER:
_load_balancer_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
}))
_flow_mock.run.assert_called_once_with()