Merge "Transition l7policy flows to dicts"

This commit is contained in:
Zuul 2020-01-26 18:19:22 +00:00 committed by Gerrit Code Review
commit 789f5310ca
13 changed files with 167 additions and 162 deletions

View File

@ -302,21 +302,20 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
# L7 Policy
def l7policy_create(self, l7policy):
payload = {consts.L7POLICY_ID: l7policy.l7policy_id}
payload = {consts.L7POLICY: l7policy.to_dict()}
self.client.cast({}, 'create_l7policy', **payload)
def l7policy_delete(self, l7policy):
l7policy_id = l7policy.l7policy_id
payload = {consts.L7POLICY_ID: l7policy_id}
payload = {consts.L7POLICY: l7policy.to_dict()}
self.client.cast({}, 'delete_l7policy', **payload)
def l7policy_update(self, old_l7policy, new_l7policy):
l7policy_dict = new_l7policy.to_dict()
if 'admin_state_up' in l7policy_dict:
l7policy_dict['enabled'] = l7policy_dict.pop('admin_state_up')
l7policy_id = l7policy_dict.pop('l7policy_id')
l7policy_dict['enabled'] = l7policy_dict.pop(consts.ADMIN_STATE_UP)
l7policy_dict.pop(consts.L7POLICY_ID)
payload = {consts.L7POLICY_ID: l7policy_id,
payload = {consts.ORIGINAL_L7POLICY: old_l7policy.to_dict(),
consts.L7POLICY_UPDATES: l7policy_dict}
self.client.cast({}, 'update_l7policy', **payload)

View File

@ -295,6 +295,7 @@ SUPPORTED_TASKFLOW_ENGINE_TYPES = ['serial', 'parallel']
ACTIVE_CONNECTIONS = 'active_connections'
ADD_NICS = 'add_nics'
ADDED_PORTS = 'added_ports'
ADMIN_STATE_UP = 'admin_state_up'
AMP_DATA = 'amp_data'
AMPHORA = 'amphora'
AMPHORA_ID = 'amphora_id'
@ -365,6 +366,7 @@ NETWORK_ID = 'network_id'
NICS = 'nics'
OBJECT = 'object'
ORIGINAL_HEALTH_MONITOR = 'original_health_monitor'
ORIGINAL_L7POLICY = 'original_l7policy'
ORIGINAL_LISTENER = 'original_listener'
ORIGINAL_LOADBALANCER = 'original_load_balancer'
ORIGINAL_MEMBER = 'original_member'

View File

@ -136,17 +136,20 @@ class Endpoints(object):
LOG.info('Deleting member \'%s\'...', member.get(constants.MEMBER_ID))
self.worker.delete_member(member)
def create_l7policy(self, context, l7policy_id):
LOG.info('Creating l7policy \'%s\'...', l7policy_id)
self.worker.create_l7policy(l7policy_id)
def create_l7policy(self, context, l7policy):
LOG.info('Creating l7policy \'%s\'...',
l7policy.get(constants.L7POLICY_ID))
self.worker.create_l7policy(l7policy)
def update_l7policy(self, context, l7policy_id, l7policy_updates):
LOG.info('Updating l7policy \'%s\'...', l7policy_id)
self.worker.update_l7policy(l7policy_id, l7policy_updates)
def update_l7policy(self, context, original_l7policy, l7policy_updates):
LOG.info('Updating l7policy \'%s\'...', original_l7policy.get(
constants.L7POLICY_ID))
self.worker.update_l7policy(original_l7policy, l7policy_updates)
def delete_l7policy(self, context, l7policy_id):
LOG.info('Deleting l7policy \'%s\'...', l7policy_id)
self.worker.delete_l7policy(l7policy_id)
def delete_l7policy(self, context, l7policy):
LOG.info('Deleting l7policy \'%s\'...', l7policy.get(
constants.L7POLICY_ID))
self.worker.delete_l7policy(l7policy)
def create_l7rule(self, context, l7rule_id):
LOG.info('Creating l7rule \'%s\'...', l7rule_id)

View File

@ -658,106 +658,73 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_pool_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_l7policy(self, l7policy_id):
def create_l7policy(self, l7policy):
"""Creates an L7 Policy.
:param l7policy_id: ID of the l7policy to create
:param l7policy: Provider dict of the l7policy to create
:returns: None
:raises NoResultFound: Unable to find the object
"""
l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7policy_id)
if not l7policy:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7policy', l7policy_id)
raise db_exceptions.NoResultFound
load_balancer = l7policy.listener.load_balancer
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()
db_listener = self._listener_repo.get(
db_apis.get_session(), id=l7policy[constants.LISTENER_ID])
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
[db_listener]))
create_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_create_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb})
constants.LOADBALANCER_ID: db_listener.load_balancer.id
})
with tf_logging.DynamicLoggingListener(create_l7policy_tf,
log=LOG):
create_l7policy_tf.run()
def delete_l7policy(self, l7policy_id):
def delete_l7policy(self, l7policy):
"""Deletes an L7 policy.
:param l7policy_id: ID of the l7policy to delete
:param l7policy: Provider dict of the l7policy to delete
:returns: None
:raises L7PolicyNotFound: The referenced l7policy was not found
"""
l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7policy_id)
load_balancer = l7policy.listener.load_balancer
db_listener = self._listener_repo.get(
db_apis.get_session(), id=l7policy[constants.LISTENER_ID])
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()
[db_listener]))
delete_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_delete_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb})
constants.LOADBALANCER_ID: db_listener.load_balancer.id
})
with tf_logging.DynamicLoggingListener(delete_l7policy_tf,
log=LOG):
delete_l7policy_tf.run()
def update_l7policy(self, l7policy_id, l7policy_updates):
def update_l7policy(self, original_l7policy, l7policy_updates):
"""Updates an L7 policy.
:param l7policy_id: ID of the l7policy to update
:param l7policy: Provider dict of the l7policy to update
:param l7policy_updates: Dict containing updated l7policy attributes
:returns: None
:raises L7PolicyNotFound: The referenced l7policy was not found
"""
l7policy = None
try:
l7policy = self._get_db_obj_until_pending_update(
self._l7policy_repo, l7policy_id)
except tenacity.RetryError as e:
LOG.warning('L7 policy did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
l7policy = e.last_attempt.result()
load_balancer = l7policy.listener.load_balancer
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()
db_listener = self._listener_repo.get(
db_apis.get_session(), id=original_l7policy[constants.LISTENER_ID])
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
[db_listener]))
update_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_update_l7policy_flow(),
store={constants.L7POLICY: l7policy,
store={constants.L7POLICY: original_l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER_ID: db_listener.load_balancer.id,
constants.UPDATE_DICT: l7policy_updates})
with tf_logging.DynamicLoggingListener(update_l7policy_tf,
log=LOG):
@ -790,11 +757,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
l7policy)
create_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_create_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
constants.L7POLICY: l7policy_dict.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb})
@ -820,11 +789,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()
l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
l7policy)
delete_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_delete_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
constants.L7POLICY: l7policy_dict.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb})
@ -860,11 +831,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
l7policy)
update_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_update_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
constants.L7POLICY: l7policy_dict.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,

View File

@ -32,7 +32,7 @@ class L7PolicyFlows(object):
create_l7policy_flow.add(lifecycle_tasks.L7PolicyToErrorOnRevertTask(
requires=[constants.L7POLICY,
constants.LISTENERS,
constants.LOADBALANCER]))
constants.LOADBALANCER_ID]))
create_l7policy_flow.add(database_tasks.MarkL7PolicyPendingCreateInDB(
requires=constants.L7POLICY))
create_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
@ -53,7 +53,7 @@ class L7PolicyFlows(object):
delete_l7policy_flow.add(lifecycle_tasks.L7PolicyToErrorOnRevertTask(
requires=[constants.L7POLICY,
constants.LISTENERS,
constants.LOADBALANCER]))
constants.LOADBALANCER_ID]))
delete_l7policy_flow.add(database_tasks.MarkL7PolicyPendingDeleteInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
@ -74,7 +74,7 @@ class L7PolicyFlows(object):
update_l7policy_flow.add(lifecycle_tasks.L7PolicyToErrorOnRevertTask(
requires=[constants.L7POLICY,
constants.LISTENERS,
constants.LOADBALANCER]))
constants.LOADBALANCER_ID]))
update_l7policy_flow.add(database_tasks.MarkL7PolicyPendingUpdateInDB(
requires=constants.L7POLICY))
update_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(

View File

@ -330,8 +330,10 @@ class DeleteL7PolicyInDB(BaseDatabaseTask):
:returns: None
"""
LOG.debug("Delete in DB for l7policy id: %s ", l7policy.id)
self.l7policy_repo.delete(db_apis.get_session(), id=l7policy.id)
LOG.debug("Delete in DB for l7policy id: %s ",
l7policy[constants.L7POLICY_ID])
self.l7policy_repo.delete(db_apis.get_session(),
id=l7policy[constants.L7POLICY_ID])
def revert(self, l7policy, *args, **kwargs):
"""Mark the l7policy ERROR since the delete couldn't happen
@ -340,14 +342,17 @@ class DeleteL7PolicyInDB(BaseDatabaseTask):
:returns: None
"""
LOG.warning("Reverting delete in DB for l7policy id %s", l7policy.id)
LOG.warning("Reverting delete in DB for l7policy id %s",
l7policy[constants.L7POLICY_ID])
try:
self.l7policy_repo.update(db_apis.get_session(), l7policy.id,
self.l7policy_repo.update(db_apis.get_session(),
l7policy[constants.L7POLICY_ID],
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error("Failed to update l7policy %(l7policy)s "
"provisioning_status to ERROR due to: %(except)s",
{'l7policy': l7policy.id, 'except': e})
{'l7policy': l7policy[constants.L7POLICY_ID],
'except': e})
class DeleteL7RuleInDB(BaseDatabaseTask):
@ -1594,8 +1599,10 @@ class UpdateL7PolicyInDB(BaseDatabaseTask):
:returns: None
"""
LOG.debug("Update DB for l7policy id: %s ", l7policy.id)
self.l7policy_repo.update(db_apis.get_session(), l7policy.id,
LOG.debug("Update DB for l7policy id: %s",
l7policy[constants.L7POLICY_ID])
self.l7policy_repo.update(db_apis.get_session(),
l7policy[constants.L7POLICY_ID],
**update_dict)
def revert(self, l7policy, *args, **kwargs):
@ -1606,14 +1613,15 @@ class UpdateL7PolicyInDB(BaseDatabaseTask):
"""
LOG.warning("Reverting update l7policy in DB "
"for l7policy id %s", l7policy.id)
"for l7policy id %s", l7policy[constants.L7POLICY_ID])
try:
self.l7policy_repo.update(db_apis.get_session(), l7policy.id,
self.l7policy_repo.update(db_apis.get_session(),
l7policy[constants.L7POLICY_ID],
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error("Failed to update l7policy %(l7p)s provisioning_status "
"to ERROR due to: %(except)s", {'l7p': l7policy.id,
'except': e})
"to ERROR due to: %(except)s",
{'l7p': l7policy[constants.L7POLICY_ID], 'except': e})
class UpdateL7RuleInDB(BaseDatabaseTask):
@ -1841,7 +1849,7 @@ class MarkHealthMonitorActiveInDB(BaseDatabaseTask):
LOG.debug("Mark ACTIVE in DB for health monitor id: %s",
health_mon[constants.HEALTHMONITOR_ID])
op_status = (constants.ONLINE if health_mon['admin_state_up']
op_status = (constants.ONLINE if health_mon[constants.ADMIN_STATE_UP]
else constants.OFFLINE)
self.health_mon_repo.update(db_apis.get_session(),
health_mon[constants.HEALTHMONITOR_ID],
@ -1978,11 +1986,13 @@ class MarkL7PolicyActiveInDB(BaseDatabaseTask):
"""
LOG.debug("Mark ACTIVE in DB for l7policy id: %s",
l7policy.id)
op_status = constants.ONLINE if l7policy.enabled else constants.OFFLINE
l7policy[constants.L7POLICY_ID])
db_l7policy = self.l7policy_repo.get(
db_apis.get_session(), id=l7policy[constants.L7POLICY_ID])
op_status = (constants.ONLINE if db_l7policy.enabled
else constants.OFFLINE)
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
l7policy[constants.L7POLICY_ID],
provisioning_status=constants.ACTIVE,
operating_status=op_status)
@ -1994,8 +2004,9 @@ class MarkL7PolicyActiveInDB(BaseDatabaseTask):
"""
LOG.warning("Reverting mark l7policy ACTIVE in DB "
"for l7policy id %s", l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
"for l7policy id %s", l7policy[constants.L7POLICY_ID])
self.task_utils.mark_l7policy_prov_status_error(
l7policy[constants.L7POLICY_ID])
class MarkL7PolicyPendingCreateInDB(BaseDatabaseTask):
@ -2012,9 +2023,9 @@ class MarkL7PolicyPendingCreateInDB(BaseDatabaseTask):
"""
LOG.debug("Mark PENDING CREATE in DB for l7policy id: %s",
l7policy.id)
l7policy[constants.L7POLICY_ID])
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
l7policy[constants.L7POLICY_ID],
provisioning_status=constants.PENDING_CREATE)
def revert(self, l7policy, *args, **kwargs):
@ -2025,8 +2036,9 @@ class MarkL7PolicyPendingCreateInDB(BaseDatabaseTask):
"""
LOG.warning("Reverting mark l7policy pending create in DB "
"for l7policy id %s", l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
"for l7policy id %s", l7policy[constants.L7POLICY_ID])
self.task_utils.mark_l7policy_prov_status_error(
l7policy[constants.L7POLICY_ID])
class MarkL7PolicyPendingDeleteInDB(BaseDatabaseTask):
@ -2043,9 +2055,9 @@ class MarkL7PolicyPendingDeleteInDB(BaseDatabaseTask):
"""
LOG.debug("Mark PENDING DELETE in DB for l7policy id: %s",
l7policy.id)
l7policy[constants.L7POLICY_ID])
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
l7policy[constants.L7POLICY_ID],
provisioning_status=constants.PENDING_DELETE)
def revert(self, l7policy, *args, **kwargs):
@ -2056,8 +2068,9 @@ class MarkL7PolicyPendingDeleteInDB(BaseDatabaseTask):
"""
LOG.warning("Reverting mark l7policy pending delete in DB "
"for l7policy id %s", l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
"for l7policy id %s", l7policy[constants.L7POLICY_ID])
self.task_utils.mark_l7policy_prov_status_error(
l7policy[constants.L7POLICY_ID])
class MarkL7PolicyPendingUpdateInDB(BaseDatabaseTask):
@ -2074,9 +2087,9 @@ class MarkL7PolicyPendingUpdateInDB(BaseDatabaseTask):
"""
LOG.debug("Mark PENDING UPDATE in DB for l7policy id: %s",
l7policy.id)
l7policy[constants.L7POLICY_ID])
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
l7policy[constants.L7POLICY_ID],
provisioning_status=(constants.
PENDING_UPDATE))
@ -2088,8 +2101,9 @@ class MarkL7PolicyPendingUpdateInDB(BaseDatabaseTask):
"""
LOG.warning("Reverting mark l7policy pending update in DB "
"for l7policy id %s", l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
"for l7policy id %s", l7policy[constants.L7POLICY_ID])
self.task_utils.mark_l7policy_prov_status_error(
l7policy[constants.L7POLICY_ID])
class MarkL7RuleActiveInDB(BaseDatabaseTask):

View File

@ -68,13 +68,13 @@ class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set a l7policy to ERROR on revert."""
def execute(self, l7policy, listeners, loadbalancer):
def execute(self, l7policy, listeners, loadbalancer_id):
pass
def revert(self, l7policy, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer[constants.LOADBALANCER_ID])
def revert(self, l7policy, listeners, loadbalancer_id, *args, **kwargs):
self.task_utils.mark_l7policy_prov_status_error(
l7policy[constants.L7POLICY_ID])
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer_id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])

View File

@ -538,7 +538,7 @@ class TestAmphoraDriver(base.TestRpc):
provider_l7policy = driver_dm.L7Policy(
l7policy_id=self.sample_data.l7policy1_id)
self.amp_driver.l7policy_create(provider_l7policy)
payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id}
payload = {consts.L7POLICY: provider_l7policy.to_dict()}
mock_cast.assert_called_with({}, 'create_l7policy', **payload)
@mock.patch('oslo_messaging.RPCClient.cast')
@ -546,7 +546,7 @@ class TestAmphoraDriver(base.TestRpc):
provider_l7policy = driver_dm.L7Policy(
l7policy_id=self.sample_data.l7policy1_id)
self.amp_driver.l7policy_delete(provider_l7policy)
payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id}
payload = {consts.L7POLICY: provider_l7policy.to_dict()}
mock_cast.assert_called_with({}, 'delete_l7policy', **payload)
@mock.patch('oslo_messaging.RPCClient.cast')
@ -558,7 +558,7 @@ class TestAmphoraDriver(base.TestRpc):
l7policy_dict = {'enabled': True}
self.amp_driver.l7policy_update(old_provider_l7policy,
provider_l7policy)
payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id,
payload = {consts.ORIGINAL_L7POLICY: old_provider_l7policy.to_dict(),
consts.L7POLICY_UPDATES: l7policy_dict}
mock_cast.assert_called_with({}, 'update_l7policy', **payload)
@ -571,7 +571,7 @@ class TestAmphoraDriver(base.TestRpc):
l7policy_dict = {'name': 'Great L7Policy'}
self.amp_driver.l7policy_update(old_provider_l7policy,
provider_l7policy)
payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id,
payload = {consts.ORIGINAL_L7POLICY: old_provider_l7policy.to_dict(),
consts.L7POLICY_UPDATES: l7policy_dict}
mock_cast.assert_called_with({}, 'update_l7policy', **payload)

View File

@ -154,20 +154,20 @@ class TestEndpoints(base.TestCase):
self.resource)
def test_create_l7policy(self):
self.ep.create_l7policy(self.context, self.resource_id)
self.ep.create_l7policy(self.context, self.resource)
self.ep.worker.create_l7policy.assert_called_once_with(
self.resource_id)
self.resource)
def test_update_l7policy(self):
self.ep.update_l7policy(self.context, self.resource_id,
self.ep.update_l7policy(self.context, self.resource,
self.resource_updates)
self.ep.worker.update_l7policy.assert_called_once_with(
self.resource_id, self.resource_updates)
self.resource, self.resource_updates)
def test_delete_l7policy(self):
self.ep.delete_l7policy(self.context, self.resource_id)
self.ep.delete_l7policy(self.context, self.resource)
self.ep.worker.delete_l7policy.assert_called_once_with(
self.resource_id)
self.resource)
def test_create_l7rule(self):
self.ep.create_l7rule(self.context, self.resource_id)

View File

@ -34,10 +34,10 @@ class TestL7PolicyFlows(base.TestCase):
self.assertIsInstance(l7policy_flow, flow.Flow)
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.L7POLICY, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7policy_flow.requires)
self.assertEqual(4, len(l7policy_flow.requires))
self.assertEqual(3, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))
def test_get_delete_l7policy_flow(self):
@ -47,11 +47,10 @@ class TestL7PolicyFlows(base.TestCase):
self.assertIsInstance(l7policy_flow, flow.Flow)
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7policy_flow.requires)
self.assertIn(constants.L7POLICY, l7policy_flow.requires)
self.assertEqual(4, len(l7policy_flow.requires))
self.assertEqual(3, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))
def test_get_update_l7policy_flow(self):
@ -62,9 +61,8 @@ class TestL7PolicyFlows(base.TestCase):
self.assertIn(constants.L7POLICY, l7policy_flow.requires)
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7policy_flow.requires)
self.assertIn(constants.UPDATE_DICT, l7policy_flow.requires)
self.assertEqual(5, len(l7policy_flow.requires))
self.assertEqual(4, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))

View File

@ -121,7 +121,7 @@ class TestDatabaseTasks(base.TestCase):
self.health_mon_mock = {
constants.HEALTHMONITOR_ID: HM_ID,
constants.POOL_ID: POOL_ID,
'admin_state_up': True,
constants.ADMIN_STATE_UP: True,
}
self.listener_mock = mock.MagicMock()
@ -147,12 +147,15 @@ class TestDatabaseTasks(base.TestCase):
constants.POOL_ID: POOL_ID,
}
self.l7policy_mock = mock.MagicMock()
self.l7policy_mock.id = L7POLICY_ID
self.l7policy_mock = {
constants.L7POLICY_ID: L7POLICY_ID,
constants.ADMIN_STATE_UP: True,
}
self.l7rule_mock = mock.MagicMock()
self.l7rule_mock.id = L7RULE_ID
self.l7rule_mock.l7policy = self.l7policy_mock
self.l7rule_mock.l7policy = mock.MagicMock()
self.l7rule_mock.l7policy.id = L7POLICY_ID
self.amphora = {
constants.ID: AMP_ID,
@ -374,7 +377,7 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_delete):
delete_l7policy = database_tasks.DeleteL7PolicyInDB()
delete_l7policy.execute(_l7policy_mock)
delete_l7policy.execute(self.l7policy_mock)
repo.L7PolicyRepository.delete.assert_called_once_with(
'TEST',
@ -383,7 +386,7 @@ class TestDatabaseTasks(base.TestCase):
# Test the revert
mock_l7policy_repo_delete.reset_mock()
delete_l7policy.revert(_l7policy_mock)
delete_l7policy.revert(self.l7policy_mock)
# TODO(sbalukoff) Fix
# repo.ListenerRepository.update.assert_called_once_with(
@ -2232,7 +2235,9 @@ class TestDatabaseTasks(base.TestCase):
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
@mock.patch('octavia.db.repositories.L7PolicyRepository.get')
def test_mark_l7policy_active_in_db(self,
mock_l7policy_repo_get,
mock_l7policy_repo_update,
mock_generate_uuid,
mock_LOG,
@ -2243,6 +2248,7 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_delete):
mark_l7policy_active = (database_tasks.MarkL7PolicyActiveInDB())
mock_l7policy_repo_get.return_value = _l7policy_mock
mark_l7policy_active.execute(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(

View File

@ -147,16 +147,18 @@ class TestLifecycleTasks(base.TestCase):
L7PolicyToErrorOnRevertTask())
# Execute
l7policy_to_error_on_revert.execute(self.L7POLICY,
l7policy_to_error_on_revert.execute({constants.L7POLICY_ID:
self.L7POLICY_ID},
self.LISTENERS,
self.LOADBALANCER)
self.LOADBALANCER_ID)
self.assertFalse(mock_l7policy_prov_status_error.called)
# Revert
l7policy_to_error_on_revert.revert(self.L7POLICY,
l7policy_to_error_on_revert.revert({constants.L7POLICY_ID:
self.L7POLICY_ID},
self.LISTENERS,
self.LOADBALANCER)
self.LOADBALANCER_ID)
mock_l7policy_prov_status_error.assert_called_once_with(
self.L7POLICY_ID)

View File

@ -37,6 +37,7 @@ COMPUTE_ID = uuidutils.generate_uuid()
L7POLICY_ID = uuidutils.generate_uuid()
L7RULE_ID = uuidutils.generate_uuid()
PROJECT_ID = uuidutils.generate_uuid()
LISTENER_ID = uuidutils.generate_uuid()
HEALTH_UPDATE_DICT = {'delay': 1, 'timeout': 2}
LISTENER_UPDATE_DICT = {'name': 'test', 'description': 'test2'}
MEMBER_UPDATE_DICT = {'weight': 1, 'ip_address': '10.0.0.0'}
@ -71,6 +72,8 @@ _db_pool_mock = mock.MagicMock()
_db_pool_mock.load_balancer = _db_load_balancer_mock
_member_mock.pool = _db_pool_mock
_l7policy_mock = mock.MagicMock()
_l7policy_mock.id = L7POLICY_ID
_l7policy_mock.to_dict.return_value = {constants.ID: L7POLICY_ID}
_l7rule_mock = mock.MagicMock()
_create_map_flow_mock = mock.MagicMock()
_db_amphora_mock.load_balancer_id = LB_ID
@ -1090,25 +1093,22 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_l7policy_repo_get.side_effect = [None, _l7policy_mock]
cw = controller_worker.ControllerWorker()
cw.create_l7policy(L7POLICY_ID)
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
_db_load_balancer_mock).to_dict()
l7policy_mock = {
constants.L7POLICY_ID: L7POLICY_ID,
constants.LISTENER_ID: LISTENER_ID
}
cw.create_l7policy(l7policy_mock)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LOADBALANCER_ID:
LB_ID,
store={constants.L7POLICY: l7policy_mock,
constants.LISTENERS:
[self.ref_listener_dict],
constants.LOADBALANCER:
provider_lb}))
constants.LOADBALANCER_ID: LB_ID}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_l7policy_repo_get.call_count)
@mock.patch('octavia.controller.worker.v2.flows.'
'l7policy_flows.L7PolicyFlows.get_delete_l7policy_flow',
@ -1130,19 +1130,19 @@ class TestControllerWorker(base.TestCase):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.delete_l7policy(L7POLICY_ID)
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
_db_load_balancer_mock).to_dict()
l7policy_mock = {
constants.L7POLICY_ID: L7POLICY_ID,
constants.LISTENER_ID: LISTENER_ID
}
cw.delete_l7policy(l7policy_mock)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
store={constants.L7POLICY: l7policy_mock,
constants.LISTENERS:
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
LB_ID,
constants.LOADBALANCER:
provider_lb}))
LB_ID}))
_flow_mock.run.assert_called_once_with()
@ -1164,22 +1164,24 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_listener_repo_get.return_value = _listener_mock
_l7policy_mock.provisioning_status = constants.PENDING_UPDATE
cw = controller_worker.ControllerWorker()
cw.update_l7policy(L7POLICY_ID, L7POLICY_UPDATE_DICT)
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
_db_load_balancer_mock).to_dict()
l7policy_mock = {
constants.L7POLICY_ID: L7POLICY_ID,
constants.LISTENER_ID: LISTENER_ID
}
cw.update_l7policy(l7policy_mock, L7POLICY_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
store={constants.L7POLICY: l7policy_mock,
constants.LISTENERS:
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
LB_ID,
constants.LOADBALANCER:
provider_lb,
constants.UPDATE_DICT:
L7POLICY_UPDATE_DICT}))
@ -1213,7 +1215,9 @@ class TestControllerWorker(base.TestCase):
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.L7POLICY: _l7policy_mock,
constants.L7POLICY: {
constants.L7POLICY_ID:
L7POLICY_ID},
constants.LOADBALANCER_ID:
LB_ID,
constants.LISTENERS:
@ -1253,7 +1257,9 @@ class TestControllerWorker(base.TestCase):
store={constants.L7RULE: _l7rule_mock,
constants.LOADBALANCER_ID:
LB_ID,
constants.L7POLICY: _l7policy_mock,
constants.L7POLICY: {
constants.L7POLICY_ID:
L7POLICY_ID},
constants.LISTENERS:
[self.ref_listener_dict],
constants.LOADBALANCER:
@ -1289,7 +1295,9 @@ class TestControllerWorker(base.TestCase):
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.L7POLICY: _l7policy_mock,
constants.L7POLICY: {
constants.L7POLICY_ID:
L7POLICY_ID},
constants.LOADBALANCER_ID:
LB_ID,
constants.LISTENERS: