Merge "Update ListenersUpdate for lb/listener dicts"
This commit is contained in:
commit
45cb1e4e16
octavia
controller/worker/v2
controller_worker.py
flows
health_monitor_flows.pyl7policy_flows.pyl7rule_flows.pylistener_flows.pyload_balancer_flows.pymember_flows.pypool_flows.py
tasks
tests/unit/controller/worker/v2
@ -296,7 +296,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
store={constants.LISTENER: listener,
|
||||
constants.UPDATE_DICT: listener_updates,
|
||||
constants.LOADBALANCER_ID: db_lb.id,
|
||||
constants.LOADBALANCER: db_lb,
|
||||
constants.LISTENERS: [listener]})
|
||||
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
|
||||
update_listener_tf.run()
|
||||
@ -389,6 +388,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
update_lb_tf = self._taskflow_load(
|
||||
self._lb_flows.get_update_load_balancer_flow(),
|
||||
store={constants.LOADBALANCER: lb,
|
||||
constants.LOADBALANCER_ID: lb.id,
|
||||
constants.UPDATE_DICT: load_balancer_updates})
|
||||
|
||||
with tf_logging.DynamicLoggingListener(update_lb_tf,
|
||||
|
@ -36,7 +36,7 @@ class HealthMonitorFlows(object):
|
||||
create_hm_flow.add(database_tasks.MarkHealthMonitorPendingCreateInDB(
|
||||
requires=constants.HEALTH_MON))
|
||||
create_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
create_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB(
|
||||
requires=constants.HEALTH_MON))
|
||||
create_hm_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
@ -59,7 +59,7 @@ class HealthMonitorFlows(object):
|
||||
delete_hm_flow.add(database_tasks.MarkHealthMonitorPendingDeleteInDB(
|
||||
requires=constants.HEALTH_MON))
|
||||
delete_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
delete_hm_flow.add(database_tasks.DeleteHealthMonitorInDB(
|
||||
requires=constants.HEALTH_MON))
|
||||
delete_hm_flow.add(database_tasks.DecrementHealthMonitorQuota(
|
||||
@ -88,7 +88,7 @@ class HealthMonitorFlows(object):
|
||||
update_hm_flow.add(database_tasks.MarkHealthMonitorPendingUpdateInDB(
|
||||
requires=constants.HEALTH_MON))
|
||||
update_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
update_hm_flow.add(database_tasks.UpdateHealthMonInDB(
|
||||
requires=[constants.HEALTH_MON, constants.UPDATE_DICT]))
|
||||
update_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB(
|
||||
|
@ -36,7 +36,7 @@ class L7PolicyFlows(object):
|
||||
create_l7policy_flow.add(database_tasks.MarkL7PolicyPendingCreateInDB(
|
||||
requires=constants.L7POLICY))
|
||||
create_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
create_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
|
||||
requires=constants.L7POLICY))
|
||||
create_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
|
||||
@ -57,7 +57,7 @@ class L7PolicyFlows(object):
|
||||
delete_l7policy_flow.add(database_tasks.MarkL7PolicyPendingDeleteInDB(
|
||||
requires=constants.L7POLICY))
|
||||
delete_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
delete_l7policy_flow.add(database_tasks.DeleteL7PolicyInDB(
|
||||
requires=constants.L7POLICY))
|
||||
delete_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
|
||||
@ -78,7 +78,7 @@ class L7PolicyFlows(object):
|
||||
update_l7policy_flow.add(database_tasks.MarkL7PolicyPendingUpdateInDB(
|
||||
requires=constants.L7POLICY))
|
||||
update_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
update_l7policy_flow.add(database_tasks.UpdateL7PolicyInDB(
|
||||
requires=[constants.L7POLICY, constants.UPDATE_DICT]))
|
||||
update_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
|
||||
|
@ -36,7 +36,7 @@ class L7RuleFlows(object):
|
||||
create_l7rule_flow.add(database_tasks.MarkL7RulePendingCreateInDB(
|
||||
requires=constants.L7RULE))
|
||||
create_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
create_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB(
|
||||
requires=constants.L7RULE))
|
||||
create_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB(
|
||||
@ -59,7 +59,7 @@ class L7RuleFlows(object):
|
||||
delete_l7rule_flow.add(database_tasks.MarkL7RulePendingDeleteInDB(
|
||||
requires=constants.L7RULE))
|
||||
delete_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
delete_l7rule_flow.add(database_tasks.DeleteL7RuleInDB(
|
||||
requires=constants.L7RULE))
|
||||
delete_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB(
|
||||
@ -82,7 +82,7 @@ class L7RuleFlows(object):
|
||||
update_l7rule_flow.add(database_tasks.MarkL7RulePendingUpdateInDB(
|
||||
requires=constants.L7RULE))
|
||||
update_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
update_l7rule_flow.add(database_tasks.UpdateL7RuleInDB(
|
||||
requires=[constants.L7RULE, constants.UPDATE_DICT]))
|
||||
update_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB(
|
||||
|
@ -33,7 +33,7 @@ class ListenerFlows(object):
|
||||
create_listener_flow.add(lifecycle_tasks.ListenersToErrorOnRevertTask(
|
||||
requires=constants.LISTENERS))
|
||||
create_listener_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
create_listener_flow.add(network_tasks.UpdateVIP(
|
||||
requires=constants.LISTENERS))
|
||||
create_listener_flow.add(database_tasks.
|
||||
@ -57,7 +57,7 @@ class ListenerFlows(object):
|
||||
requires=constants.LOADBALANCER_ID,
|
||||
provides=constants.LOADBALANCER))
|
||||
create_all_listeners_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
create_all_listeners_flow.add(network_tasks.UpdateVIP(
|
||||
requires=constants.LISTENERS))
|
||||
return create_all_listeners_flow
|
||||
@ -114,7 +114,7 @@ class ListenerFlows(object):
|
||||
update_listener_flow.add(lifecycle_tasks.ListenerToErrorOnRevertTask(
|
||||
requires=constants.LISTENER))
|
||||
update_listener_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
update_listener_flow.add(network_tasks.UpdateVIP(
|
||||
requires=constants.LISTENERS))
|
||||
update_listener_flow.add(database_tasks.UpdateListenerInDB(
|
||||
|
@ -338,7 +338,7 @@ class LoadBalancerFlows(object):
|
||||
update_LB_flow.add(network_tasks.ApplyQos(
|
||||
requires=(constants.LOADBALANCER, constants.UPDATE_DICT)))
|
||||
update_LB_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
update_LB_flow.add(database_tasks.UpdateLoadbalancerInDB(
|
||||
requires=[constants.LOADBALANCER, constants.UPDATE_DICT]))
|
||||
update_LB_flow.add(database_tasks.MarkLBActiveInDB(
|
||||
|
@ -47,7 +47,7 @@ class MemberFlows(object):
|
||||
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)
|
||||
))
|
||||
create_member_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
create_member_flow.add(database_tasks.MarkMemberActiveInDB(
|
||||
requires=constants.MEMBER))
|
||||
create_member_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
@ -73,7 +73,7 @@ class MemberFlows(object):
|
||||
delete_member_flow.add(database_tasks.MarkMemberPendingDeleteInDB(
|
||||
requires=constants.MEMBER))
|
||||
delete_member_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
delete_member_flow.add(database_tasks.DeleteMemberInDB(
|
||||
requires=constants.MEMBER))
|
||||
delete_member_flow.add(database_tasks.DecrementMemberQuota(
|
||||
@ -101,7 +101,7 @@ class MemberFlows(object):
|
||||
update_member_flow.add(database_tasks.MarkMemberPendingUpdateInDB(
|
||||
requires=constants.MEMBER))
|
||||
update_member_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
update_member_flow.add(database_tasks.UpdateMemberInDB(
|
||||
requires=[constants.MEMBER, constants.UPDATE_DICT]))
|
||||
update_member_flow.add(database_tasks.MarkMemberActiveInDB(
|
||||
@ -190,7 +190,7 @@ class MemberFlows(object):
|
||||
|
||||
# Update the Listener (this makes the changes active on the Amp)
|
||||
batch_update_members_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
|
||||
# Mark all the members ACTIVE here, then pool then LB/Listeners
|
||||
batch_update_members_flow.add(unordered_members_active_flow)
|
||||
|
@ -36,7 +36,7 @@ class PoolFlows(object):
|
||||
create_pool_flow.add(database_tasks.MarkPoolPendingCreateInDB(
|
||||
requires=constants.POOL_ID))
|
||||
create_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
create_pool_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
requires=constants.POOL_ID))
|
||||
create_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
|
||||
@ -59,7 +59,7 @@ class PoolFlows(object):
|
||||
delete_pool_flow.add(database_tasks.CountPoolChildrenForQuota(
|
||||
requires=constants.POOL_ID, provides=constants.POOL_CHILD_COUNT))
|
||||
delete_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
delete_pool_flow.add(database_tasks.DeletePoolInDB(
|
||||
requires=constants.POOL_ID))
|
||||
delete_pool_flow.add(database_tasks.DecrementPoolQuota(
|
||||
@ -109,7 +109,7 @@ class PoolFlows(object):
|
||||
update_pool_flow.add(database_tasks.MarkPoolPendingUpdateInDB(
|
||||
requires=constants.POOL_ID))
|
||||
update_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=constants.LOADBALANCER))
|
||||
requires=constants.LOADBALANCER_ID))
|
||||
update_pool_flow.add(database_tasks.UpdatePoolInDB(
|
||||
requires=[constants.POOL_ID, constants.UPDATE_DICT]))
|
||||
update_pool_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
|
@ -92,15 +92,22 @@ class AmpListenersUpdate(BaseAmphoraTask):
|
||||
class ListenersUpdate(BaseAmphoraTask):
|
||||
"""Task to update amphora with all specified listeners' configurations."""
|
||||
|
||||
def execute(self, loadbalancer):
|
||||
def execute(self, loadbalancer_id):
|
||||
"""Execute updates per listener for an amphora."""
|
||||
self.amphora_driver.update(loadbalancer)
|
||||
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
|
||||
id=loadbalancer_id)
|
||||
if loadbalancer:
|
||||
self.amphora_driver.update(loadbalancer)
|
||||
else:
|
||||
LOG.error('Load balancer %s for listeners update not found. '
|
||||
'Skipping update.', loadbalancer_id)
|
||||
|
||||
def revert(self, loadbalancer, *args, **kwargs):
|
||||
def revert(self, loadbalancer_id, *args, **kwargs):
|
||||
"""Handle failed listeners updates."""
|
||||
|
||||
LOG.warning("Reverting listeners updates.")
|
||||
|
||||
loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(),
|
||||
id=loadbalancer_id)
|
||||
for listener in loadbalancer.listeners:
|
||||
self.task_utils.mark_listener_prov_status_error(
|
||||
listener.id)
|
||||
|
@ -38,9 +38,8 @@ class TestListenerFlows(base.TestCase):
|
||||
|
||||
self.assertIn(constants.LISTENERS, listener_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
|
||||
|
||||
self.assertEqual(3, len(listener_flow.requires))
|
||||
self.assertEqual(2, len(listener_flow.requires))
|
||||
self.assertEqual(0, len(listener_flow.provides))
|
||||
|
||||
def test_get_delete_listener_flow(self, mock_get_net_driver):
|
||||
@ -78,9 +77,8 @@ class TestListenerFlows(base.TestCase):
|
||||
self.assertIn(constants.UPDATE_DICT, listener_flow.requires)
|
||||
self.assertIn(constants.LISTENERS, listener_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
|
||||
|
||||
self.assertEqual(5, len(listener_flow.requires))
|
||||
self.assertEqual(4, len(listener_flow.requires))
|
||||
self.assertEqual(0, len(listener_flow.provides))
|
||||
|
||||
def test_get_create_all_listeners_flow(self, mock_get_net_driver):
|
||||
|
@ -137,7 +137,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
||||
self.assertIn(constants.UPDATE_DICT, lb_flow.requires)
|
||||
|
||||
self.assertEqual(0, len(lb_flow.provides))
|
||||
self.assertEqual(2, len(lb_flow.requires))
|
||||
self.assertEqual(3, len(lb_flow.requires))
|
||||
|
||||
def test_get_post_lb_amp_association_flow(self, mock_get_net_driver):
|
||||
amp_flow = self.LBFlow.get_post_lb_amp_association_flow(
|
||||
|
@ -107,40 +107,6 @@ class TestAmphoraDriverTasks(base.TestCase):
|
||||
mock_amphora_repo_update.assert_called_once_with(
|
||||
_session_mock, AMP_ID, status=constants.ERROR)
|
||||
|
||||
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
|
||||
def test_listener_update(self,
|
||||
mock_lb_get,
|
||||
mock_driver,
|
||||
mock_generate_uuid,
|
||||
mock_log,
|
||||
mock_get_session,
|
||||
mock_listener_repo_get,
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update):
|
||||
|
||||
listener_update_obj = amphora_driver_tasks.ListenersUpdate()
|
||||
listener_update_obj.execute(_load_balancer_mock)
|
||||
|
||||
mock_driver.update.assert_called_once_with(_load_balancer_mock)
|
||||
|
||||
# Test the revert
|
||||
amp = listener_update_obj.revert(_load_balancer_mock)
|
||||
repo.ListenerRepository.update.assert_called_once_with(
|
||||
_session_mock,
|
||||
id=LISTENER_ID,
|
||||
provisioning_status=constants.ERROR)
|
||||
self.assertIsNone(amp)
|
||||
|
||||
# Test the revert with exception
|
||||
repo.ListenerRepository.update.reset_mock()
|
||||
mock_listener_repo_update.side_effect = Exception('fail')
|
||||
amp = listener_update_obj.revert(_load_balancer_mock)
|
||||
repo.ListenerRepository.update.assert_called_once_with(
|
||||
_session_mock,
|
||||
id=LISTENER_ID,
|
||||
provisioning_status=constants.ERROR)
|
||||
self.assertIsNone(amp)
|
||||
|
||||
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
|
||||
def test_listeners_update(self,
|
||||
mock_lb_get,
|
||||
@ -157,15 +123,15 @@ class TestAmphoraDriverTasks(base.TestCase):
|
||||
data_models.Listener(id='listener2')]
|
||||
vip = data_models.Vip(ip_address='10.0.0.1')
|
||||
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners, vip=vip)
|
||||
mock_lb_get.return_value = lb
|
||||
listeners_update_obj.execute(lb)
|
||||
mock_lb_get.side_effect = [lb, None, lb]
|
||||
listeners_update_obj.execute(lb.id)
|
||||
mock_driver.update.assert_called_once_with(lb)
|
||||
self.assertEqual(1, mock_driver.update.call_count)
|
||||
|
||||
mock_lb_get.reset_mock()
|
||||
mock_driver.update.reset_mock()
|
||||
|
||||
listeners_update_obj.execute(None)
|
||||
mock_lb_get.assert_not_called()
|
||||
mock_driver.update.assert_not_called()
|
||||
|
||||
# Test the revert
|
||||
amp = listeners_update_obj.revert(lb)
|
||||
|
@ -411,8 +411,6 @@ class TestControllerWorker(base.TestCase):
|
||||
constants.UPDATE_DICT:
|
||||
LISTENER_UPDATE_DICT,
|
||||
constants.LOADBALANCER_ID: LB_ID,
|
||||
constants.LOADBALANCER:
|
||||
_load_balancer_mock,
|
||||
constants.LISTENERS:
|
||||
[listener_dict]}))
|
||||
|
||||
@ -718,6 +716,8 @@ class TestControllerWorker(base.TestCase):
|
||||
store={constants.UPDATE_DICT: change,
|
||||
constants.LOADBALANCER:
|
||||
_load_balancer_mock,
|
||||
constants.LOADBALANCER_ID:
|
||||
_load_balancer_mock.id,
|
||||
}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
|
Loading…
x
Reference in New Issue
Block a user