Add L7 controller worker flows and tasks

This commit modifies the controller worker and its tasks and flows
in order to enable the manipulation of L7 policies and rules
in Octavia as well as unit tests for the same. It is one in a chain
of commits designed to keep the size of each individual commit
manageable / reviewable. Jinja template and documentation updates
will come in later commits.

Partially-Implements: blueprint lbaas-l7-rules
Partially-Implements: blueprint layer-7-switching
Change-Id: I13de3f89d9236cf508744b04c1d8de04296a34f3
This commit is contained in:
Stephen Balukoff 2016-02-09 02:15:11 -08:00
parent e4bdb86712
commit fbc4a0aa4d
11 changed files with 998 additions and 0 deletions

View File

@ -149,6 +149,8 @@ NICS = 'nics'
VIP = 'vip'
POOL = 'pool'
POOL_ID = 'pool_id'
L7POLICY = 'l7policy'
L7RULE = 'l7rule'
OBJECT = 'object'
SERVER_PEM = 'server_pem'
UPDATE_DICT = 'update_dict'
@ -167,12 +169,16 @@ CREATE_LISTENER_FLOW = 'octavia-create-listener_flow'
CREATE_LOADBALANCER_FLOW = 'octavia-create-loadbalancer-flow'
CREATE_MEMBER_FLOW = 'octavia-create-member-flow'
CREATE_POOL_FLOW = 'octavia-create-pool-flow'
CREATE_L7POLICY_FLOW = 'octavia-create-l7policy-flow'
CREATE_L7RULE_FLOW = 'octavia-create-l7rule-flow'
DELETE_AMPHORA_FLOW = 'octavia-delete-amphora-flow'
DELETE_HEALTH_MONITOR_FLOW = 'octavia-delete-health-monitor-flow'
DELETE_LISTENER_FLOW = 'octavia-delete-listener_flow'
DELETE_LOADBALANCER_FLOW = 'octavia-delete-loadbalancer-flow'
DELETE_MEMBER_FLOW = 'octavia-delete-member-flow'
DELETE_POOL_FLOW = 'octavia-delete-pool-flow'
DELETE_L7POLICY_FLOW = 'octavia-delete-l7policy-flow'
DELETE_L7RULE_FLOW = 'octavia-delete-l7policy-flow'
FAILOVER_AMPHORA_FLOW = 'octavia-failover-amphora-flow'
LOADBALANCER_NETWORKING_SUBFLOW = 'octavia-new-loadbalancer-net-subflow'
UPDATE_HEALTH_MONITOR_FLOW = 'octavia-update-health-monitor-flow'
@ -180,6 +186,8 @@ UPDATE_LISTENER_FLOW = 'octavia-update-listener-flow'
UPDATE_LOADBALANCER_FLOW = 'octavia-update-loadbalancer-flow'
UPDATE_MEMBER_FLOW = 'octavia-update-member-flow'
UPDATE_POOL_FLOW = 'octavia-update-pool-flow'
UPDATE_L7POLICY_FLOW = 'octavia-update-l7policy-flow'
UPDATE_L7RULE_FLOW = 'octavia-update-l7rule-flow'
WAIT_FOR_AMPHORA = 'wait-for-amphora'
FAILOVER_AMPHORA_FLOW = 'octavia-failover-amphora-flow'

View File

@ -101,3 +101,27 @@ class Endpoint(object):
def delete_member(self, context, member_id):
LOG.info(_LI('Deleting member \'%s\'...') % member_id)
self.worker.delete_member(member_id)
def create_l7policy(self, context, l7policy_id):
LOG.info(_LI('Creating l7policy \'%s\'...') % l7policy_id)
self.worker.create_l7policy(l7policy_id)
def update_l7policy(self, context, l7policy_id, l7policy_updates):
LOG.info(_LI('Updating l7policy \'%s\'...') % l7policy_id)
self.worker.update_l7policy(l7policy_id, l7policy_updates)
def delete_l7policy(self, context, l7policy_id):
LOG.info(_LI('Deleting l7policy \'%s\'...') % l7policy_id)
self.worker.delete_l7policy(l7policy_id)
def create_l7rule(self, context, l7rule_id):
LOG.info(_LI('Creating l7rule \'%s\'...') % l7rule_id)
self.worker.create_l7rule(l7rule_id)
def update_l7rule(self, context, l7rule_id, l7rule_updates):
LOG.info(_LI('Updating l7rule \'%s\'...') % l7rule_id)
self.worker.update_l7rule(l7rule_id, l7rule_updates)
def delete_l7rule(self, context, l7rule_id):
LOG.info(_LI('Deleting l7rule \'%s\'...') % l7rule_id)
self.worker.delete_l7rule(l7rule_id)

View File

@ -19,6 +19,8 @@ from octavia.common import base_taskflow
from octavia.common import constants
from octavia.controller.worker.flows import amphora_flows
from octavia.controller.worker.flows import health_monitor_flows
from octavia.controller.worker.flows import l7policy_flows
from octavia.controller.worker.flows import l7rule_flows
from octavia.controller.worker.flows import listener_flows
from octavia.controller.worker.flows import load_balancer_flows
from octavia.controller.worker.flows import member_flows
@ -45,6 +47,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
self._listener_flows = listener_flows.ListenerFlows()
self._member_flows = member_flows.MemberFlows()
self._pool_flows = pool_flows.PoolFlows()
self._l7policy_flows = l7policy_flows.L7PolicyFlows()
self._l7rule_flows = l7rule_flows.L7RuleFlows()
self._amphora_repo = repo.AmphoraRepository()
self._health_mon_repo = repo.HealthMonitorRepository()
@ -52,6 +56,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
self._listener_repo = repo.ListenerRepository()
self._member_repo = repo.MemberRepository()
self._pool_repo = repo.PoolRepository()
self._l7policy_repo = repo.L7PolicyRepository()
self._l7rule_repo = repo.L7RuleRepository()
self._exclude_result_logging_tasks = (
constants.ROLE_STANDALONE + '-' +
@ -476,6 +482,142 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_pool_tf.run()
def create_l7policy(self, l7policy_id):
"""Creates an L7 Policy.
:param l7policy_id: ID of the l7policy to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
"""
l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7policy_id)
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
create_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_create_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(create_l7policy_tf,
log=LOG):
create_l7policy_tf.run()
def delete_l7policy(self, l7policy_id):
"""Deletes an L7 policy.
:param l7policy_id: ID of the l7policy to delete
:returns: None
:raises L7PolicyNotFound: The referenced l7policy was not found
"""
l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7policy_id)
load_balancer = l7policy.listener.load_balancer
listeners = [l7policy.listener]
delete_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_delete_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(delete_l7policy_tf,
log=LOG):
delete_l7policy_tf.run()
def update_l7policy(self, l7policy_id, l7policy_updates):
"""Updates an L7 policy.
:param l7policy_id: ID of the l7policy to update
:param l7policy_updates: Dict containing updated l7policy attributes
:returns: None
:raises L7PolicyNotFound: The referenced l7policy was not found
"""
l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7policy_id)
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
update_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_update_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer,
constants.UPDATE_DICT: l7policy_updates})
with tf_logging.DynamicLoggingListener(update_l7policy_tf,
log=LOG):
update_l7policy_tf.run()
def create_l7rule(self, l7rule_id):
"""Creates an L7 Rule.
:param l7rule_id: ID of the l7rule to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
"""
l7rule = self._l7rule_repo.get(db_apis.get_session(),
id=l7rule_id)
listeners = [l7rule.l7policy.listener]
load_balancer = l7rule.l7policy.listener.load_balancer
create_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_create_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(create_l7rule_tf,
log=LOG):
create_l7rule_tf.run()
def delete_l7rule(self, l7rule_id):
"""Deletes an L7 rule.
:param l7rule_id: ID of the l7rule to delete
:returns: None
:raises L7RuleNotFound: The referenced l7rule was not found
"""
l7rule = self._l7rule_repo.get(db_apis.get_session(),
id=l7rule_id)
load_balancer = l7rule.l7policy.listener.load_balancer
listeners = [l7rule.l7policy.listener]
delete_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_delete_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(delete_l7rule_tf,
log=LOG):
delete_l7rule_tf.run()
def update_l7rule(self, l7rule_id, l7rule_updates):
"""Updates an L7 rule.
:param l7rule_id: ID of the l7rule to update
:param l7rule_updates: Dict containing updated l7rule attributes
:returns: None
:raises L7RuleNotFound: The referenced l7rule was not found
"""
l7rule = self._l7rule_repo.get(db_apis.get_session(),
id=l7rule_id)
listeners = [l7rule.l7policy.listener]
load_balancer = l7rule.l7policy.listener.load_balancer
update_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_update_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer,
constants.UPDATE_DICT: l7rule_updates})
with tf_logging.DynamicLoggingListener(update_l7rule_tf,
log=LOG):
update_l7rule_tf.run()
def failover_amphora(self, amphora_id):
"""Perform failover operations for an amphora.

View File

@ -0,0 +1,73 @@
# Copyright 2016 Blue Box, an IBM Company
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import model_tasks
class L7PolicyFlows(object):
def get_create_l7policy_flow(self):
"""Create a flow to create an L7 policy
:returns: The flow for creating an L7 policy
"""
create_l7policy_flow = linear_flow.Flow(constants.CREATE_L7POLICY_FLOW)
create_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
return create_l7policy_flow
def get_delete_l7policy_flow(self):
"""Create a flow to delete an L7 policy
:returns: The flow for deleting an L7 policy
"""
delete_l7policy_flow = linear_flow.Flow(constants.DELETE_L7POLICY_FLOW)
delete_l7policy_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.L7POLICY}))
delete_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
delete_l7policy_flow.add(database_tasks.DeleteL7PolicyInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
return delete_l7policy_flow
def get_update_l7policy_flow(self):
"""Create a flow to update an L7 policy
:returns: The flow for updating an L7 policy
"""
update_l7policy_flow = linear_flow.Flow(constants.UPDATE_L7POLICY_FLOW)
update_l7policy_flow.add(
model_tasks.UpdateAttributes(
rebind={constants.OBJECT: constants.L7POLICY},
requires=[constants.UPDATE_DICT]))
update_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_l7policy_flow.add(database_tasks.UpdateL7PolicyInDB(
requires=[constants.L7POLICY, constants.UPDATE_DICT]))
update_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
return update_l7policy_flow

View File

@ -0,0 +1,73 @@
# Copyright 2016 Blue Box, an IBM Company
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import model_tasks
class L7RuleFlows(object):
def get_create_l7rule_flow(self):
"""Create a flow to create an L7 rule
:returns: The flow for creating an L7 rule
"""
create_l7rule_flow = linear_flow.Flow(constants.CREATE_L7RULE_FLOW)
create_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
return create_l7rule_flow
def get_delete_l7rule_flow(self):
"""Create a flow to delete an L7 rule
:returns: The flow for deleting an L7 rule
"""
delete_l7rule_flow = linear_flow.Flow(constants.DELETE_L7RULE_FLOW)
delete_l7rule_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.L7RULE}))
delete_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
delete_l7rule_flow.add(database_tasks.DeleteL7RuleInDB(
requires=constants.L7RULE))
delete_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
return delete_l7rule_flow
def get_update_l7rule_flow(self):
"""Create a flow to update an L7 rule
:returns: The flow for updating an L7 rule
"""
update_l7rule_flow = linear_flow.Flow(constants.UPDATE_L7RULE_FLOW)
update_l7rule_flow.add(
model_tasks.UpdateAttributes(
rebind={constants.OBJECT: constants.L7RULE},
requires=[constants.UPDATE_DICT]))
update_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_l7rule_flow.add(database_tasks.UpdateL7RuleInDB(
requires=[constants.L7RULE, constants.UPDATE_DICT]))
update_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
return update_l7rule_flow

View File

@ -46,6 +46,8 @@ class BaseDatabaseTask(task.Task):
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
self.amp_health_repo = repo.AmphoraHealthRepository()
self.l7policy_repo = repo.L7PolicyRepository()
self.l7rule_repo = repo.L7RuleRepository()
super(BaseDatabaseTask, self).__init__(**kwargs)
def _delete_from_amp_health(self, amphora_id):
@ -238,6 +240,65 @@ class DeletePoolInDB(BaseDatabaseTask):
# operating_status=constants.ERROR)
class DeleteL7PolicyInDB(BaseDatabaseTask):
"""Delete the L7 policy in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7policy):
"""Delete the l7policy in DB
:param l7policy: The l7policy to be deleted
:returns: None
"""
LOG.debug("Delete in DB for l7policy id: %s ", l7policy.id)
self.l7policy_repo.delete(db_apis.get_session(), id=l7policy.id)
def revert(self, l7policy_id, *args, **kwargs):
"""Mark the l7policy ERROR since the delete couldn't happen
:returns: None
"""
LOG.warn(_LW("Reverting delete in DB "
"for l7policy id %s"), l7policy_id)
# TODO(sbalukoff) Fix this
# self.listener_repo.update(db_apis.get_session(), l7policy.listener.id,
# operating_status=constants.ERROR)
class DeleteL7RuleInDB(BaseDatabaseTask):
"""Delete the L7 rule in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7rule):
"""Delete the l7rule in DB
:param l7rule: The l7rule to be deleted
:returns: None
"""
LOG.debug("Delete in DB for l7rule id: %s ", l7rule.id)
self.l7rule_repo.delete(db_apis.get_session(), id=l7rule.id)
def revert(self, l7rule_id, *args, **kwargs):
"""Mark the l7rule ERROR since the delete couldn't happen
:returns: None
"""
LOG.warn(_LW("Reverting delete in DB "
"for l7rule id %s"), l7rule_id)
# TODO(sbalukoff) Fix this
# self.listener_repo.update(db_apis.get_session(),
# l7rule.l7policy.listener.id,
# operating_status=constants.ERROR)
class ReloadAmphora(BaseDatabaseTask):
"""Get an amphora object from the database."""
@ -954,6 +1015,68 @@ class UpdatePoolInDB(BaseDatabaseTask):
pool.id, {'enabled': 0}, None)
class UpdateL7PolicyInDB(BaseDatabaseTask):
"""Update the L7 policy in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7policy, update_dict):
"""Update the L7 policy in the DB
:param l7policy: The L7 policy to be updated
:param update_dict: The dictionary of updates to apply
:returns: None
"""
LOG.debug("Update DB for l7policy id: %s ", l7policy.id)
self.l7policy_repo.update(db_apis.get_session(), l7policy.id,
**update_dict)
def revert(self, l7policy, *args, **kwargs):
"""Mark the l7policy ERROR since the update couldn't happen
:returns: None
"""
LOG.warn(_LW("Reverting update l7policy in DB "
"for l7policy id %s"), l7policy.id)
# TODO(sbalukoff) fix this to set the upper objects to ERROR
self.l7policy_repo.update(db_apis.get_session(), l7policy.id,
enabled=0)
class UpdateL7RuleInDB(BaseDatabaseTask):
"""Update the L7 rule in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7rule, update_dict):
"""Update the L7 rule in the DB
:param l7rule: The L7 rule to be updated
:param update_dict: The dictionary of updates to apply
:returns: None
"""
LOG.debug("Update DB for l7rule id: %s ", l7rule.id)
self.l7rule_repo.update(db_apis.get_session(), l7rule.id,
**update_dict)
def revert(self, l7rule, *args, **kwargs):
"""Mark the L7 rule ERROR since the update couldn't happen
:returns: None
"""
LOG.warn(_LW("Reverting update l7rule in DB "
"for l7rule id %s"), l7rule.id)
# TODO(sbalukoff) fix this to set appropriate upper objects to ERROR
self.l7policy_repo.update(db_apis.get_session(), l7rule.l7policy.id,
enabled=0)
class GetAmphoraDetails(BaseDatabaseTask):
"""Task to retrieve amphora network details."""

View File

@ -117,3 +117,35 @@ class TestEndpoint(base.TestCase):
self.ep.delete_member(self.context, self.resource_id)
self.ep.worker.delete_member.assert_called_once_with(
self.resource_id)
def test_create_l7policy(self):
self.ep.create_l7policy(self.context, self.resource_id)
self.ep.worker.create_l7policy.assert_called_once_with(
self.resource_id)
def test_update_l7policy(self):
self.ep.update_l7policy(self.context, self.resource_id,
self.resource_updates)
self.ep.worker.update_l7policy.assert_called_once_with(
self.resource_id, self.resource_updates)
def test_delete_l7policy(self):
self.ep.delete_l7policy(self.context, self.resource_id)
self.ep.worker.delete_l7policy.assert_called_once_with(
self.resource_id)
def test_create_l7rule(self):
self.ep.create_l7rule(self.context, self.resource_id)
self.ep.worker.create_l7rule.assert_called_once_with(
self.resource_id)
def test_update_l7rule(self):
self.ep.update_l7rule(self.context, self.resource_id,
self.resource_updates)
self.ep.worker.update_l7rule.assert_called_once_with(
self.resource_id, self.resource_updates)
def test_delete_l7rule(self):
self.ep.delete_l7rule(self.context, self.resource_id)
self.ep.worker.delete_l7rule.assert_called_once_with(
self.resource_id)

View File

@ -0,0 +1,67 @@
# Copyright 2016 Blue Box, an IBM Company
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow as flow
from octavia.common import constants
from octavia.controller.worker.flows import l7policy_flows
import octavia.tests.unit.base as base
class TestL7PolicyFlows(base.TestCase):
def setUp(self):
self.L7PolicyFlow = l7policy_flows.L7PolicyFlows()
super(TestL7PolicyFlows, self).setUp()
def test_get_create_l7policy_flow(self):
l7policy_flow = self.L7PolicyFlow.get_create_l7policy_flow()
self.assertIsInstance(l7policy_flow, flow.Flow)
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertEqual(2, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))
def test_get_delete_l7policy_flow(self):
l7policy_flow = self.L7PolicyFlow.get_delete_l7policy_flow()
self.assertIsInstance(l7policy_flow, flow.Flow)
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.L7POLICY, l7policy_flow.requires)
self.assertEqual(3, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))
def test_get_update_l7policy_flow(self):
l7policy_flow = self.L7PolicyFlow.get_update_l7policy_flow()
self.assertIsInstance(l7policy_flow, flow.Flow)
self.assertIn(constants.L7POLICY, l7policy_flow.requires)
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.UPDATE_DICT, l7policy_flow.requires)
self.assertEqual(4, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))

View File

@ -0,0 +1,67 @@
# Copyright 2016 Blue Box, an IBM Company
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow.patterns import linear_flow as flow
from octavia.common import constants
from octavia.controller.worker.flows import l7rule_flows
import octavia.tests.unit.base as base
class TestL7RuleFlows(base.TestCase):
def setUp(self):
self.L7RuleFlow = l7rule_flows.L7RuleFlows()
super(TestL7RuleFlows, self).setUp()
def test_get_create_l7rule_flow(self):
l7rule_flow = self.L7RuleFlow.get_create_l7rule_flow()
self.assertIsInstance(l7rule_flow, flow.Flow)
self.assertIn(constants.LISTENERS, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER, l7rule_flow.requires)
self.assertEqual(2, len(l7rule_flow.requires))
self.assertEqual(0, len(l7rule_flow.provides))
def test_get_delete_l7rule_flow(self):
l7rule_flow = self.L7RuleFlow.get_delete_l7rule_flow()
self.assertIsInstance(l7rule_flow, flow.Flow)
self.assertIn(constants.LISTENERS, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER, l7rule_flow.requires)
self.assertIn(constants.L7RULE, l7rule_flow.requires)
self.assertEqual(3, len(l7rule_flow.requires))
self.assertEqual(0, len(l7rule_flow.provides))
def test_get_update_l7rule_flow(self):
l7rule_flow = self.L7RuleFlow.get_update_l7rule_flow()
self.assertIsInstance(l7rule_flow, flow.Flow)
self.assertIn(constants.L7RULE, l7rule_flow.requires)
self.assertIn(constants.LISTENERS, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER, l7rule_flow.requires)
self.assertIn(constants.UPDATE_DICT, l7rule_flow.requires)
self.assertEqual(4, len(l7rule_flow.requires))
self.assertEqual(0, len(l7rule_flow.provides))

View File

@ -37,6 +37,8 @@ PORT_ID = uuidutils.generate_uuid()
SUBNET_ID = uuidutils.generate_uuid()
VRRP_PORT_ID = uuidutils.generate_uuid()
HA_PORT_ID = uuidutils.generate_uuid()
L7POLICY_ID = uuidutils.generate_uuid()
L7RULE_ID = uuidutils.generate_uuid()
VIP_IP = '192.0.5.2'
VRRP_IP = '192.0.5.3'
HA_IP = '192.0.5.4'
@ -61,6 +63,10 @@ _loadbalancer_mock.id = LB_ID
_loadbalancer_mock.amphorae = [_amphora_mock]
_pool_mock = mock.MagicMock()
_pool_mock.id = POOL_ID
_l7policy_mock = mock.MagicMock()
_l7policy_mock.id = L7POLICY_ID
_l7rule_mock = mock.MagicMock()
_l7rule_mock.id = L7RULE_ID
_listener_mock = mock.MagicMock()
_listener_mock.id = LISTENER_ID
_tf_failure_mock = mock.Mock(spec=failure.Failure)
@ -134,6 +140,13 @@ class TestDatabaseTasks(base.TestCase):
self.pool_mock = mock.MagicMock()
self.pool_mock.id = POOL_ID
self.l7policy_mock = mock.MagicMock()
self.l7policy_mock.id = L7POLICY_ID
self.l7rule_mock = mock.MagicMock()
self.l7rule_mock.id = L7RULE_ID
self.l7rule_mock.l7policy = self.l7policy_mock
super(TestDatabaseTasks, self).setUp()
@mock.patch('octavia.db.repositories.AmphoraRepository.create',
@ -278,6 +291,64 @@ class TestDatabaseTasks(base.TestCase):
# repo.PoolRepository.update.assert_called_once_with(
# 'TEST',
# POOL_ID,
# operating_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7PolicyRepository.delete')
def test_delete_l7policy_in_db(self,
mock_l7policy_repo_delete,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
delete_l7policy = database_tasks.DeleteL7PolicyInDB()
delete_l7policy.execute(_l7policy_mock)
repo.L7PolicyRepository.delete.assert_called_once_with(
'TEST',
id=L7POLICY_ID)
# Test the revert
mock_l7policy_repo_delete.reset_mock()
delete_l7policy.revert(L7POLICY_ID)
# TODO(sbalukoff) Fix
# repo.ListenerRepository.update.assert_called_once_with(
# 'TEST',
# LISTENER_ID,
# operating_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7RuleRepository.delete')
def test_delete_l7rule_in_db(self,
mock_l7rule_repo_delete,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
delete_l7rule = database_tasks.DeleteL7RuleInDB()
delete_l7rule.execute(_l7rule_mock)
repo.L7RuleRepository.delete.assert_called_once_with(
'TEST',
id=L7RULE_ID)
# Test the revert
mock_l7rule_repo_delete.reset_mock()
delete_l7rule.revert(L7RULE_ID)
# TODO(sbalukoff) Fix
# repo.ListenerRepository.update.assert_called_once_with(
# 'TEST',
# LISTENER_ID,
# operating_status=constants.ERROR)
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
@ -1046,6 +1117,75 @@ class TestDatabaseTasks(base.TestCase):
POOL_ID,
{'enabled': 0}, None)
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
def test_update_l7policy_in_db(self,
mock_l7policy_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_l7policy = database_tasks.UpdateL7PolicyInDB()
update_l7policy.execute(self.l7policy_mock,
{'action': constants.L7POLICY_ACTION_REJECT})
repo.L7PolicyRepository.update.assert_called_once_with(
'TEST',
L7POLICY_ID,
action=constants.L7POLICY_ACTION_REJECT)
# Test the revert
mock_l7policy_repo_update.reset_mock()
update_l7policy.revert(self.l7policy_mock)
# TODO(sbalukoff) fix this to set the upper objects to ERROR
repo.L7PolicyRepository.update.assert_called_once_with(
'TEST',
L7POLICY_ID,
enabled=0)
@mock.patch('octavia.db.repositories.L7RuleRepository.update')
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
def test_update_l7rule_in_db(self,
mock_l7rule_repo_update,
mock_l7policy_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
update_l7rule = database_tasks.UpdateL7RuleInDB()
update_l7rule.execute(
self.l7rule_mock,
{'type': constants.L7RULE_TYPE_PATH,
'compare_type': constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'value': '/api'})
repo.L7RuleRepository.update.assert_called_once_with(
'TEST',
L7RULE_ID,
type=constants.L7RULE_TYPE_PATH,
compare_type=constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
value='/api')
# Test the revert
mock_l7rule_repo_update.reset_mock()
update_l7rule.revert(self.l7rule_mock)
# TODO(sbalukoff) fix this to set the upper objects to ERROR
repo.L7PolicyRepository.update.assert_called_once_with(
'TEST',
L7POLICY_ID,
enabled=0)
def test_get_amphora_details(self,
mock_generate_uuid,
mock_LOG,

View File

@ -29,10 +29,17 @@ POOL_ID = uuidutils.generate_uuid()
HM_ID = uuidutils.generate_uuid()
MEMBER_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
L7POLICY_ID = uuidutils.generate_uuid()
L7RULE_ID = uuidutils.generate_uuid()
HEALTH_UPDATE_DICT = {'delay': 1, 'timeout': 2}
LISTENER_UPDATE_DICT = {'name': 'test', 'description': 'test2'}
MEMBER_UPDATE_DICT = {'weight': 1, 'ip_address': '10.0.0.0'}
POOL_UPDATE_DICT = {'name': 'test', 'description': 'test2'}
L7POLICY_UPDATE_DICT = {'action': constants.L7POLICY_ACTION_REJECT}
L7RULE_UPDATE_DICT = {
'type': constants.L7RULE_TYPE_PATH,
'compare_type': constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'value': '/api'}
_amphora_mock = mock.MagicMock()
_flow_mock = mock.MagicMock()
@ -42,6 +49,8 @@ _listener_mock = mock.MagicMock()
_load_balancer_mock = mock.MagicMock()
_member_mock = mock.MagicMock()
_pool_mock = mock.MagicMock()
_l7policy_mock = mock.MagicMock()
_l7rule_mock = mock.MagicMock()
_create_map_flow_mock = mock.MagicMock()
_amphora_mock.load_balancer_id = LB_ID
_amphora_mock.id = AMP_ID
@ -57,6 +66,10 @@ CONF = cfg.CONF
return_value=_load_balancer_mock)
@mock.patch('octavia.db.repositories.ListenerRepository.get',
return_value=_listener_mock)
@mock.patch('octavia.db.repositories.L7PolicyRepository.get',
return_value=_l7policy_mock)
@mock.patch('octavia.db.repositories.L7RuleRepository.get',
return_value=_l7rule_mock)
@mock.patch('octavia.db.repositories.MemberRepository.get',
return_value=_member_mock)
@mock.patch('octavia.db.repositories.PoolRepository.get',
@ -82,6 +95,8 @@ class TestControllerWorker(base.TestCase):
_pool_mock.listeners = [_listener_mock]
_pool_mock.load_balancer = _load_balancer_mock
_pool_mock.load_balancer.vip = _vip_mock
_l7policy_mock.listener = _listener_mock
_l7rule_mock.l7policy = _l7policy_mock
fetch_mock = mock.MagicMock(return_value=AMP_ID)
_flow_mock.storage.fetch = fetch_mock
@ -101,6 +116,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -130,6 +147,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -161,6 +180,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -193,6 +214,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -226,6 +249,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -260,6 +285,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -289,6 +316,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -316,6 +345,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -351,6 +382,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -417,6 +450,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -448,6 +483,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -481,6 +518,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -511,6 +550,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -541,6 +582,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -573,6 +616,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -603,6 +648,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -633,6 +680,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -655,6 +704,202 @@ class TestControllerWorker(base.TestCase):
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'l7policy_flows.L7PolicyFlows.get_create_l7policy_flow',
return_value=_flow_mock)
def test_create_l7policy(self,
mock_get_create_listener_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.create_l7policy(L7POLICY_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
[_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'l7policy_flows.L7PolicyFlows.get_delete_l7policy_flow',
return_value=_flow_mock)
def test_delete_l7policy(self,
mock_get_delete_listener_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.delete_l7policy(L7POLICY_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
[_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'l7policy_flows.L7PolicyFlows.get_update_l7policy_flow',
return_value=_flow_mock)
def test_update_l7policy(self,
mock_get_update_listener_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.update_l7policy(L7POLICY_ID, L7POLICY_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
[_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock,
constants.UPDATE_DICT:
L7POLICY_UPDATE_DICT}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'l7rule_flows.L7RuleFlows.get_create_l7rule_flow',
return_value=_flow_mock)
def test_create_l7rule(self,
mock_get_create_listener_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.create_l7rule(L7RULE_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.LISTENERS:
[_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'l7rule_flows.L7RuleFlows.get_delete_l7rule_flow',
return_value=_flow_mock)
def test_delete_l7rule(self,
mock_get_delete_listener_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.delete_l7rule(L7RULE_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.LISTENERS:
[_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'l7rule_flows.L7RuleFlows.get_update_l7rule_flow',
return_value=_flow_mock)
def test_update_l7rule(self,
mock_get_update_listener_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.update_l7rule(L7RULE_ID, L7RULE_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.LISTENERS:
[_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock,
constants.UPDATE_DICT:
L7RULE_UPDATE_DICT}))
_flow_mock.run.assert_called_once_with()
@mock.patch('octavia.controller.worker.flows.'
'amphora_flows.AmphoraFlows.get_failover_flow',
return_value=_flow_mock)
@ -665,6 +910,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
@ -694,6 +941,8 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,