Removed_legacy_service_chain_code

Change-Id: Ie7bcf691a48bcaedb9f5a8413136592608eb3897
This commit is contained in:
pulkitvajpayee07
2022-04-11 10:49:03 +05:30
parent ceeeb71a0a
commit e170eae3e0
214 changed files with 0 additions and 45606 deletions

View File

@@ -1,300 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import collections
from neutron.common import config
from neutron_lib import context as n_ctx
from oslo_config import cfg
from oslo_serialization import jsonutils
from gbpservice.neutron.services.servicechain.plugins.ncp import (
plugin as ncp_plugin)
from gbpservice.neutron.services.servicechain.plugins.ncp import context
from gbpservice.neutron.tests.unit.db.grouppolicy import (
test_servicechain_db as test_servicechain_db)
from gbpservice.neutron.tests.unit.db.grouppolicy import test_group_policy_db
cfg.CONF.import_opt(
'node_drivers',
'gbpservice.neutron.services.servicechain.plugins.ncp.config',
group='node_composition_plugin')
class ServiceChainNCPTestPlugin(ncp_plugin.NodeCompositionPlugin):
supported_extension_aliases = ['servicechain'] + (
test_group_policy_db.UNSUPPORTED_REQUIRED_EXTS)
path_prefix = "/servicechain"
SC_PLUGIN_KLASS = (ServiceChainNCPTestPlugin.__module__ + '.' +
ServiceChainNCPTestPlugin.__name__)
class ServiceChainPluginTestCase(test_servicechain_db.ServiceChainDbTestCase):
def setUp(self, core_plugin=None, sc_plugin=None, gp_plugin=None):
super(ServiceChainPluginTestCase, self).setUp(core_plugin=core_plugin,
sc_plugin=sc_plugin or
SC_PLUGIN_KLASS,
gp_plugin=gp_plugin)
try:
config.cfg.CONF.keystone_authtoken.username
except config.cfg.NoSuchOptError:
config.cfg.CONF.register_opt(
config.cfg.StrOpt('username'),
'keystone_authtoken')
try:
config.cfg.CONF.keystone_authtoken.password
except config.cfg.NoSuchOptError:
config.cfg.CONF.register_opt(
config.cfg.StrOpt('password'),
'keystone_authtoken')
try:
config.cfg.CONF.keystone_authtoken.project_name
except config.cfg.NoSuchOptError:
config.cfg.CONF.register_opt(
config.cfg.StrOpt('project_name'),
'keystone_authtoken')
class BaseTestGroupPolicyPluginGroupResources(
ServiceChainPluginTestCase,
test_servicechain_db.TestServiceChainResources):
def test_spec_shared(self):
# Shared spec can only point shared nodes
node = self._create_profiled_servicechain_node(
'LOADBALANCERV2', shared=True, shared_profile=True,
profile_tenant_id='admin', tenant_id='admin')['servicechain_node']
self.create_servicechain_spec(nodes=[node['id']], shared=True,
expected_res_status=201)
self.create_servicechain_spec(nodes=[node['id']], shared=False,
tenant_id='admin',
expected_res_status=201)
node = self._create_profiled_servicechain_node(
'LOADBALANCERV2', shared=False, profile_tenant_id='nonadmin',
tenant_id='nonadmin')['servicechain_node']
self.create_servicechain_spec(nodes=[node['id']], shared=True,
expected_res_status=404)
self.create_servicechain_spec(nodes=[node['id']], shared=True,
tenant_id='nonadmin',
expected_res_status=400)
self.create_servicechain_spec(nodes=[node['id']], shared=False,
tenant_id='nonadmin',
expected_res_status=201)
def test_node_shared(self):
# Shared node can only point shared profile
prof = self.create_service_profile(
service_type='LOADBALANCERV2', shared=True,
tenant_id='admin')['service_profile']
to_update = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
expected_res_status=201)['servicechain_node']
self.create_servicechain_node(
service_profile_id=prof['id'], shared=False, tenant_id='admin',
expected_res_status=201)
prof = self.create_service_profile(
service_type='LOADBALANCERV2', shared=False,
tenant_id='admin')['service_profile']
self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
expected_res_status=404)
self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
tenant_id='admin', expected_res_status=400)
self.create_servicechain_node(
service_profile_id=prof['id'], shared=False,
tenant_id='admin', expected_res_status=201)
self.create_servicechain_spec(nodes=[to_update['id']], shared=True,
tenant_id='nonadmin',
expected_res_status=201)
data = {'servicechain_node': {'shared': False}}
req = self.new_update_request('servicechain_nodes', data,
to_update['id'])
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
res = self.deserialize(self.fmt, res)
self.assertEqual('InvalidSharedAttributeUpdate',
res['NeutronError']['type'])
def test_profile_shared(self):
prof = self.create_service_profile(
service_type='LOADBALANCERV2', shared=True,
tenant_id='admin')['service_profile']
self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
expected_res_status=201)
data = {'service_profile': {'shared': False}}
req = self.new_update_request('service_profiles', data,
prof['id'])
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
res = self.deserialize(self.fmt, res)
self.assertEqual('InvalidSharedAttributeUpdate',
res['NeutronError']['type'])
prof = self.create_service_profile(
service_type='LOADBALANCERV2', shared=False)['service_profile']
self.create_servicechain_node(
service_profile_id=prof['id'], shared=False,
expected_res_status=201)
data = {'service_profile': {'shared': True}}
req = self.new_update_request('service_profiles', data,
prof['id'])
res = req.get_response(self.ext_api)
self.assertEqual(200, res.status_int)
res = self.deserialize(self.fmt, res)
self.assertTrue(res['service_profile']['shared'])
def test_node_context_profile(self):
# Current node with profile
plugin_context = n_ctx.get_admin_context()
plugin_context.is_admin = plugin_context.is_advsvc = False
plugin_context.tenant_id = self._tenant_id
prof = self.create_service_profile(
service_type='LOADBALANCERV2')['service_profile']
current = self.create_servicechain_node(
service_profile_id=prof['id'],
expected_res_status=201)['servicechain_node']
ctx = context.NodeDriverContext(self.plugin, plugin_context,
None, None, current, 0,
prof, None)
self.assertIsNone(ctx.original_node)
self.assertIsNone(ctx.original_profile)
self.assertEqual(ctx.current_node, current)
self.assertEqual(ctx.current_profile, prof)
# Original node with profile
prof2 = self.create_service_profile(
service_type='LOADBALANCERV2')['service_profile']
original = self.create_servicechain_node(
service_profile_id=prof2['id'],
expected_res_status=201)['servicechain_node']
ctx = context.NodeDriverContext(
self.plugin, plugin_context, None, None, current, 0,
prof, None, original_service_chain_node=original,
original_service_profile=prof2)
self.assertEqual(ctx.original_node, original)
self.assertEqual(ctx.original_profile, prof2)
self.assertEqual(ctx.current_node, current)
self.assertEqual(ctx.current_profile, prof)
def test_node_context_no_profile(self):
plugin_context = n_ctx.get_admin_context()
plugin_context.is_admin = plugin_context.is_advsvc = False
plugin_context.tenant_id = 'test_tenant'
current = self.create_servicechain_node(
service_type='TEST',
expected_res_status=201)['servicechain_node']
ctx = context.NodeDriverContext(self.plugin, plugin_context,
None, None, current, 0,
None, None)
self.assertIsNone(ctx.original_node)
self.assertIsNone(ctx.original_profile)
self.assertEqual(ctx.current_node, current)
self.assertIsNone(ctx.current_profile)
original = self.create_servicechain_node(
service_type='TEST',
expected_res_status=201)['servicechain_node']
ctx = context.NodeDriverContext(
self.plugin, plugin_context, None, None, current, 0,
None, None, original_service_chain_node=original)
self.assertEqual(ctx.original_node, original)
self.assertIsNone(ctx.original_profile)
self.assertEqual(ctx.current_node, current)
self.assertIsNone(ctx.current_profile)
def test_spec_parameters(self):
params_node_1 = ['p1', 'p2', 'p3']
params_node_2 = ['p4', 'p5', 'p6']
params_node_3 = ['p7', 'p8', 'p9']
def params_dict(params):
return jsonutils.dumps({'Parameters':
dict((x, {}) for x in params)})
prof = self.create_service_profile(
service_type='LOADBALANCERV2', shared=True,
tenant_id='admin')['service_profile']
# Create 2 nodes with different parameters
node1 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_1),
expected_res_status=201)['servicechain_node']
node2 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_2),
expected_res_status=201)['servicechain_node']
# Create SC spec with the nodes assigned
spec = self.create_servicechain_spec(
nodes=[node1['id'], node2['id']], shared=True,
expected_res_status=201)['servicechain_spec']
# Verify param names correspondence
self.assertEqual(
collections.Counter(params_node_1 + params_node_2),
collections.Counter(ast.literal_eval(spec['config_param_names'])))
# Update the spec removing one node
self.update_servicechain_spec(spec['id'], nodes=[node1['id']],
expected_res_status=200)
spec = self.show_servicechain_spec(spec['id'])['servicechain_spec']
# Verify param names correspondence
self.assertEqual(
collections.Counter(params_node_1),
collections.Counter(ast.literal_eval(spec['config_param_names'])))
# Update the spec without modifying the node list
self.update_servicechain_spec(spec['id'],
name='new_name',
expected_res_status=200)
spec = self.show_servicechain_spec(spec['id'])['servicechain_spec']
# Verify param names correspondence
self.assertEqual(
collections.Counter(params_node_1),
collections.Counter(ast.literal_eval(spec['config_param_names'])))
# Update a node with new config params
self.update_servicechain_node(node1['id'],
config=params_dict(params_node_3),
expected_res_status=200)
spec = self.show_servicechain_spec(spec['id'])['servicechain_spec']
# Verify param names correspondence
self.assertEqual(
collections.Counter(params_node_3),
collections.Counter(ast.literal_eval(spec['config_param_names'])))

View File

@@ -1,624 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from unittest import mock
import heatclient
from neutron_lib.api.definitions import external_net
from neutron_lib import context as neutron_context
from neutron_lib.plugins import constants
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
import webob
from gbpservice.neutron.services.servicechain.plugins.ncp import config
from gbpservice.neutron.services.servicechain.plugins.ncp.node_drivers import (
heat_node_driver as heat_node_driver)
from gbpservice.neutron.services.servicechain.plugins.ncp.node_drivers import (
openstack_heat_api_client as heatClient)
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_resource_mapping as test_gp_driver)
from gbpservice.neutron.tests.unit.services.servicechain.ncp import (
test_ncp_plugin as test_ncp_plugin)
STACK_ACTION_WAIT_TIME = 15
class MockStackObject(object):
def __init__(self, status):
self.stack_status = status
class MockHeatClientFunctionsDeleteNotFound(object):
def delete(self, stack_id):
raise heatclient.exc.HTTPNotFound()
def create(self, **fields):
return {'stack': {'id': uuidutils.generate_uuid()}}
def get(self, stack_id):
return MockStackObject('DELETE_COMPLETE')
class MockHeatClientFunctions(object):
def delete(self, stack_id):
pass
def create(self, **fields):
return {'stack': {'id': uuidutils.generate_uuid()}}
def get(self, stack_id):
return MockStackObject('DELETE_COMPLETE')
def update(self, *args, **fields):
return {'stack': {'id': uuidutils.generate_uuid()}}
class MockHeatClientDeleteNotFound(object):
def __init__(self, api_version, endpoint, **kwargs):
self.stacks = MockHeatClientFunctionsDeleteNotFound()
class MockHeatClient(object):
def __init__(self, api_version, endpoint, **kwargs):
self.stacks = MockHeatClientFunctions()
self.resources = mock.MagicMock()
class HeatNodeDriverTestCase(
test_ncp_plugin.NodeCompositionPluginTestCase):
DEFAULT_LB_CONFIG_DICT = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"test_pool": {
"Type": "OS::Neutron::LBaaS::Pool",
"Properties": {
"description": "Haproxy pool from template",
"lb_algorithm": "ROUND_ROBIN",
"protocol": "HTTP",
'listener': {'get_resource': 'listener'},
}
},
"test_listener": {
"Type": "OS::Neutron::LBaaS::Listener",
"Properties": {
"protocol": "HTTP",
"protocol_port": 80,
}
},
"test_lb": {
"Type": "OS::Neutron::LBaaS::LoadBalancer",
"Properties": {
"provider": 'haproxy',
'vip_address': '1.1.1.1',
'vip_subnet': '1.1.1.0/24',
}
}
}
}
DEFAULT_LB_CONFIG = jsonutils.dumps(DEFAULT_LB_CONFIG_DICT)
DEFAULT_FW_CONFIG_DICT = {
"heat_template_version": "2013-05-23",
"resources": {
'test_fw': {
"type": "OS::Neutron::Firewall",
"properties": {
"admin_state_up": True,
"firewall_policy_id": {
"get_resource": "Firewall_policy"},
"name": "testFirewall",
"description": "test Firewall"
}
},
'test_fw_policy': {
"type": "OS::Neutron::FirewallPolicy",
"properties": {
"shared": False,
"description": "test firewall policy",
"name": "testFWPolicy",
"firewall_rules": [{
"get_resource": "Rule_1"}],
"audited": True
}
}
}
}
DEFAULT_FW_CONFIG = jsonutils.dumps(DEFAULT_FW_CONFIG_DICT)
SERVICE_PROFILE_VENDOR = 'heat_based_node_driver'
def setUp(self):
config.cfg.CONF.set_override('stack_action_wait_time',
STACK_ACTION_WAIT_TIME,
group='heat_node_driver')
mock.patch(heatclient.__name__ + ".client.Client",
new=MockHeatClient).start()
super(HeatNodeDriverTestCase, self).setUp(
node_drivers=['heat_node_driver'],
node_plumber='stitching_plumber',
core_plugin=test_gp_driver.CORE_PLUGIN)
def _create_network(self, fmt, name, admin_state_up, **kwargs):
"""Override the routine for allowing the router:external attribute."""
# attributes containing a colon should be passed with
# a double underscore
new_args = dict(zip([x.replace('__', ':') for x in kwargs],
list(kwargs.values())))
arg_list = new_args.pop('arg_list', ()) + (external_net.EXTERNAL,)
return super(HeatNodeDriverTestCase, self)._create_network(
fmt, name, admin_state_up, arg_list=arg_list, **new_args)
def test_manager_initialized(self):
mgr = self.plugin.driver_manager
self.assertIsInstance(mgr.ordered_drivers[0].obj,
heat_node_driver.HeatNodeDriver)
for driver in mgr.ordered_drivers:
self.assertTrue(driver.obj.initialized)
def _create_profiled_servicechain_node(
self, service_type=constants.LOADBALANCERV2, shared_profile=False,
profile_tenant_id=None, profile_id=None, **kwargs):
if not profile_id:
prof = self.create_service_profile(
service_type=service_type,
shared=shared_profile,
vendor=self.SERVICE_PROFILE_VENDOR,
tenant_id=profile_tenant_id or self._tenant_id)[
'service_profile']
else:
prof = self.get_service_profile(profile_id)
service_config = kwargs.get('config')
if not service_config or service_config == '{}':
if service_type == constants.FIREWALL:
kwargs['config'] = self.DEFAULT_FW_CONFIG
else:
kwargs['config'] = self.DEFAULT_LB_CONFIG
return self.create_servicechain_node(
service_profile_id=prof['id'], **kwargs)
class TestServiceChainInstance(HeatNodeDriverTestCase):
def _get_node_instance_stacks(self, sc_node_id):
context = neutron_context.get_admin_context()
with context.session.begin(subtransactions=True):
return (context.session.query(
heat_node_driver.ServiceNodeInstanceStack).
filter_by(sc_node_id=sc_node_id).
all())
def test_invalid_service_type_rejected(self):
node_used = self._create_profiled_servicechain_node(
service_type="test")['servicechain_node']
spec_used = self.create_servicechain_spec(
nodes=[node_used['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
classifier = self.create_policy_classifier()['policy_classifier']
res = self.create_servicechain_instance(
provider_ptg_id=provider['id'],
classifier_id=classifier['id'],
servicechain_specs=[spec_used['id']],
expected_res_status=webob.exc.HTTPBadRequest.code)
self.assertEqual('NoDriverAvailableForAction',
res['NeutronError']['type'])
def test_node_create(self):
with mock.patch.object(heatClient.HeatClient,
'create') as stack_create:
stack_create.return_value = {'stack': {
'id': uuidutils.generate_uuid()}}
self._create_simple_service_chain()
expected_stack_name = mock.ANY
expected_stack_params = mock.ANY
stack_create.assert_called_once_with(
expected_stack_name,
self.DEFAULT_LB_CONFIG_DICT,
expected_stack_params)
def _get_pool_member_resource_dict(self, port):
member_ip = port['fixed_ips'][0]['ip_address']
member_name = 'mem-' + member_ip
member = {member_name: {
'Type': 'OS::Neutron::LBaaS::PoolMember',
'Properties': {
'subnet': {'get_param': 'Subnet'},
'weight': 1,
'admin_state_up': True,
'address': member_ip,
'protocol_port': {'get_param': 'app_port'},
'pool': {'Ref': 'test_pool'}
}
}
}
return member
def _create_policy_target_port(self, policy_target_group_id):
pt = self.create_policy_target(
policy_target_group_id=policy_target_group_id)['policy_target']
req = self.new_show_request('ports', pt['port_id'], fmt=self.fmt)
port = self.deserialize(self.fmt,
req.get_response(self.api))['port']
return (pt, port)
def _create_external_policy(self, consumed_prs, routes=None):
with self.network(router__external=True, shared=True) as net:
with self.subnet(cidr='192.168.0.0/24', network=net) as sub:
if not routes:
routes = [{'destination': '172.0.0.0/22', 'nexthop': None}]
self.create_external_segment(
shared=True,
name="default",
external_routes=routes,
subnet_id=sub['subnet']['id'])
return self.create_external_policy(
consumed_policy_rule_sets={consumed_prs: ''})
def _test_lb_node_create(self, consumer_external=False):
with mock.patch.object(heatClient.HeatClient,
'create') as stack_create:
stack_create.return_value = {'stack': {
'id': uuidutils.generate_uuid()}}
node_id = self._create_profiled_servicechain_node(
service_type=constants.LOADBALANCERV2)[
'servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group()['policy_target_group']
_, port1 = self._create_policy_target_port(provider['id'])
_, port2 = self._create_policy_target_port(provider['id'])
if consumer_external:
self._create_external_policy(prs['id'])
else:
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
self.update_policy_target_group(
provider['id'], provided_policy_rule_sets={prs['id']: ''})
created_stacks_map = self._get_node_instance_stacks(node_id)
self.assertEqual(1, len(created_stacks_map))
pool_member1 = self._get_pool_member_resource_dict(port1)
pool_member2 = self._get_pool_member_resource_dict(port2)
# Instantiating the chain invokes stack create
expected_stack_template = copy.deepcopy(
self.DEFAULT_LB_CONFIG_DICT)
expected_stack_template['Resources'].update(pool_member1)
expected_stack_template['Resources'].update(pool_member2)
expected_stack_name = mock.ANY
# TODO(Magesh): Verify expected_stack_params with IP address from
# Network Service Policy
expected_stack_params = {}
stack_create.assert_called_once_with(
expected_stack_name,
expected_stack_template,
expected_stack_params)
return (expected_stack_template, provider,
created_stacks_map[0].stack_id)
def _test_lb_dynamic_pool_member_add(self, expected_stack_template,
provider, stack_id):
with mock.patch.object(heatClient.HeatClient,
'update') as stack_update:
stack_update.return_value = {'stack': {
'id': stack_id}}
# Creating PT will update the node, thereby adding the PT as an
# LB Pool Member using heat stack
pt, port = self._create_policy_target_port(provider['id'])
pool_member = self._get_pool_member_resource_dict(port)
expected_stack_template['Resources'].update(pool_member)
expected_stack_id = stack_id
expected_stack_params = {}
stack_update.assert_called_once_with(
expected_stack_id,
expected_stack_template,
expected_stack_params)
return (pt, pool_member)
def _test_dynamic_lb_pool_member_delete(self, pt, pool_member,
expected_stack_template,
stack_id):
# Deleting PT will update the node, thereby removing the Pool
# Member from heat stack
with mock.patch.object(heatClient.HeatClient,
'update') as stack_update:
self.delete_policy_target(pt['id'])
template_on_delete_pt = copy.deepcopy(expected_stack_template)
template_on_delete_pt['Resources'].pop(list(pool_member.keys())[0])
expected_stack_id = stack_id
expected_stack_params = {}
stack_update.assert_called_once_with(
expected_stack_id,
template_on_delete_pt,
expected_stack_params)
def _test_node_cleanup(self, ptg, stack_id):
with mock.patch.object(heatClient.HeatClient,
'delete') as stack_delete:
self.update_policy_target_group(
ptg['id'], consumed_policy_rule_sets={},
expected_res_status=200)
self.delete_policy_target_group(ptg['id'], expected_res_status=204)
stack_delete.assert_called_once_with(stack_id)
def test_lb_node_operations(self):
expected_stack_template, provider, stack_id = (
self._test_lb_node_create())
pt, pool_member = self._test_lb_dynamic_pool_member_add(
expected_stack_template, provider, stack_id)
self._test_dynamic_lb_pool_member_delete(
pt, pool_member, expected_stack_template, stack_id)
self._test_node_cleanup(provider, stack_id)
def test_lb_redirect_from_external(self):
expected_stack_template, provider, stack_id = (
self._test_lb_node_create(consumer_external=True))
pt, pool_member = self._test_lb_dynamic_pool_member_add(
expected_stack_template, provider, stack_id)
self._test_dynamic_lb_pool_member_delete(
pt, pool_member, expected_stack_template, stack_id)
self._test_node_cleanup(provider, stack_id)
def _create_fwredirect_ruleset(self, classifier_port, classifier_protocol):
node_id = self._create_profiled_servicechain_node(
service_type=constants.FIREWALL)['servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
action = self.create_policy_action(action_type='REDIRECT',
action_value=spec['id'])
classifier = self.create_policy_classifier(
port_range=classifier_port, protocol=classifier_protocol,
direction='bi')
rule = self.create_policy_rule(
policy_actions=[action['policy_action']['id']],
policy_classifier_id=classifier['policy_classifier']['id'])
rule = rule['policy_rule']
prs = self.create_policy_rule_set(policy_rules=[rule['id']])
return (prs['policy_rule_set'], node_id)
def _get_ptg_cidr(self, ptg):
req = self.new_show_request(
'subnets', ptg['subnets'][0], fmt=self.fmt)
ptg_subnet = self.deserialize(
self.fmt, req.get_response(self.api))['subnet']
return ptg_subnet['cidr']
def _get_firewall_rule_dict(self, rule_name, protocol, port, provider_cidr,
consumer_cidr):
if provider_cidr and consumer_cidr:
fw_rule = {rule_name: {'type': "OS::Neutron::FirewallRule",
'properties': {
"protocol": protocol,
"enabled": True,
"destination_port": port,
"action": "allow",
"destination_ip_address": provider_cidr,
"source_ip_address": consumer_cidr
}
}
}
return fw_rule
return {}
def test_fw_node_east_west(self):
classifier_port = '66'
classifier_protocol = 'udp'
with mock.patch.object(heatClient.HeatClient,
'create') as stack_create:
stack_create.return_value = {'stack': {
'id': uuidutils.generate_uuid()}}
prs, node_id = self._create_fwredirect_ruleset(
classifier_port, classifier_protocol)
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
created_stacks_map = self._get_node_instance_stacks(node_id)
self.assertEqual(1, len(created_stacks_map))
stack_id = created_stacks_map[0].stack_id
provider_cidr = self._get_ptg_cidr(provider)
# TODO(ivar): This has to be removed once support to consumer list
# is implemented
# consumer_cidr = self._get_ptg_cidr(consumer)
consumer_cidr = []
fw_rule = self._get_firewall_rule_dict(
'Rule_1', classifier_protocol, classifier_port,
provider_cidr, consumer_cidr)
expected_stack_template = copy.deepcopy(
self.DEFAULT_FW_CONFIG_DICT)
expected_stack_template['resources'][
'test_fw_policy']['properties']['firewall_rules'] = []
expected_stack_template['resources'].update(fw_rule)
expected_stack_name = mock.ANY
expected_stack_params = {}
stack_create.assert_called_once_with(
expected_stack_name,
expected_stack_template,
expected_stack_params)
self._test_node_cleanup(provider, stack_id)
def _test_fw_node_north_south(self, consumer_cidrs):
classifier_port = '66'
classifier_protocol = 'udp'
with mock.patch.object(heatClient.HeatClient,
'create') as stack_create:
stack_create.return_value = {'stack': {
'id': uuidutils.generate_uuid()}}
prs, node_id = self._create_fwredirect_ruleset(
classifier_port, classifier_protocol)
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
routes = []
for consumer_cidr in consumer_cidrs:
routes.append({'destination': consumer_cidr, 'nexthop': None})
self._create_external_policy(prs['id'], routes=routes)
# TODO(ivar): This has to be removed once support to consumer list
# is implemented
consumer_cidrs = []
created_stacks_map = self._get_node_instance_stacks(node_id)
self.assertEqual(1, len(created_stacks_map))
stack_id = created_stacks_map[0].stack_id
expected_stack_template = copy.deepcopy(
self.DEFAULT_FW_CONFIG_DICT)
expected_stack_template['resources']['test_fw_policy'][
'properties']['firewall_rules'] = []
provider_cidr = self._get_ptg_cidr(provider)
rule_num = 1
for consumer_cidr in consumer_cidrs:
rule_name = 'Rule_' + str(rule_num)
fw_rule = self._get_firewall_rule_dict(
rule_name, classifier_protocol, classifier_port,
provider_cidr, consumer_cidr)
rule_num = rule_num + 1
expected_stack_template['resources'].update(fw_rule)
expected_stack_template['resources']['test_fw_policy'][
'properties']['firewall_rules'].append(
{'get_resource': rule_name})
expected_stack_name = mock.ANY
expected_stack_params = {}
stack_create.assert_called_once_with(
expected_stack_name,
expected_stack_template,
expected_stack_params)
self._test_node_cleanup(provider, stack_id)
def test_fw_node_north_south_single_external_cidr(self):
self._test_fw_node_north_south(['172.0.0.0/22'])
def test_fw_node_north_south_multiple_external_cidr(self):
self._test_fw_node_north_south(['172.0.0.0/22', '20.0.0.0/16'])
def test_node_update(self):
with mock.patch.object(heatClient.HeatClient,
'create') as stack_create:
stack_create.return_value = {'stack': {
'id': uuidutils.generate_uuid()}}
prof = self.create_service_profile(
service_type=constants.LOADBALANCERV2,
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
self._create_chain_with_nodes(node_ids=[node['id']])
with mock.patch.object(heatClient.HeatClient,
'update') as stack_update:
self.update_servicechain_node(
node['id'],
name='newname',
expected_res_status=200)
# Name update should not update stack ??
stack_update.assert_called_once_with(
mock.ANY, mock.ANY, mock.ANY)
def test_node_delete(self):
with mock.patch.object(heatClient.HeatClient,
'create') as stack_create:
stack_create.return_value = {'stack': {
'id': uuidutils.generate_uuid()}}
provider, _, _ = self._create_simple_service_chain()
with mock.patch.object(heatClient.HeatClient,
'delete'):
self.update_policy_target_group(
provider['id'],
provided_policy_rule_sets={},
expected_res_status=200)
self.delete_policy_target_group(provider['id'],
expected_res_status=204)
def test_wait_stack_delete_for_instance_delete(self):
with mock.patch.object(heatClient.HeatClient,
'create') as stack_create:
stack_create.return_value = {'stack': {
'id': uuidutils.generate_uuid()}}
provider, _, _ = self._create_simple_service_chain()
# Verify that as part of delete service chain instance we call
# get method for heat stack 5 times before giving up if the state
# does not become DELETE_COMPLETE
with mock.patch.object(heatClient.HeatClient,
'delete') as stack_delete:
with mock.patch.object(heatClient.HeatClient,
'get') as stack_get:
stack_get.return_value = MockStackObject(
'DELETE_IN_PROGRESS')
# Removing the PRSs will make the PTG deletable again
self.update_policy_target_group(
provider['id'],
provided_policy_rule_sets={},
expected_res_status=200)
self.delete_policy_target_group(provider['id'],
expected_res_status=204)
stack_delete.assert_called_once_with(mock.ANY)
# Create and delete another service chain instance and verify that
# we call get method for heat stack only once if the stack state
# is DELETE_COMPLETE
provider, _, _ = self._create_simple_service_chain()
with mock.patch.object(heatClient.HeatClient,
'delete') as stack_delete:
with mock.patch.object(heatClient.HeatClient,
'get') as stack_get:
stack_get.return_value = MockStackObject(
'DELETE_COMPLETE')
# Removing the PRSs will make the PTG deletable again
self.update_policy_target_group(
provider['id'],
provided_policy_rule_sets={},
expected_res_status=200)
self.delete_policy_target_group(provider['id'],
expected_res_status=204)
stack_delete.assert_called_once_with(mock.ANY)
def test_stack_not_found_ignored(self):
mock.patch(heatclient.__name__ + ".client.Client",
new=MockHeatClientDeleteNotFound).start()
provider, _, _ = self._create_simple_service_chain()
# Removing the PRSs will make the PTG deletable again
self.update_policy_target_group(provider['id'],
provided_policy_rule_sets={},
expected_res_status=200)
self.delete_policy_target_group(provider['id'],
expected_res_status=204)

View File

@@ -1,899 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
import webob.exc
from neutron.common import config
from neutron_lib import context as n_context
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import constants as pconst
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_serialization import jsonutils
from gbpservice.neutron.services.grouppolicy import config as gpconfig # noqa
from gbpservice.neutron.services.servicechain.plugins.ncp import (
context as ncp_context)
from gbpservice.neutron.services.servicechain.plugins.ncp import (
exceptions as exc)
from gbpservice.neutron.services.servicechain.plugins.ncp import (
plugin as ncp_plugin)
import gbpservice.neutron.services.servicechain.plugins.ncp.config # noqa
from gbpservice.neutron.services.servicechain.plugins.ncp.node_drivers import (
dummy_driver as dummy_driver)
from gbpservice.neutron.tests.unit.db.grouppolicy import test_group_policy_db
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_resource_mapping as test_gp_driver)
from gbpservice.neutron.tests.unit.services.servicechain import (
base_test_servicechain_plugin as test_base)
class ServiceChainNCPTestPlugin(ncp_plugin.NodeCompositionPlugin):
supported_extension_aliases = ['servicechain'] + (
test_group_policy_db.UNSUPPORTED_REQUIRED_EXTS)
path_prefix = "/servicechain"
SC_PLUGIN_KLASS = (ServiceChainNCPTestPlugin.__module__ + '.' +
ServiceChainNCPTestPlugin.__name__)
CORE_PLUGIN = test_gp_driver.CORE_PLUGIN
GP_PLUGIN_KLASS = (
"gbpservice.neutron.services.grouppolicy.plugin.GroupPolicyPlugin"
)
CHAIN_TENANT_ID = 'sci_owner'
class NodeCompositionPluginTestMixin(object):
DEFAULT_LB_CONFIG = '{}'
SERVICE_PROFILE_VENDOR = 'dummy'
@property
def sc_plugin(self):
return directory.get_plugin(pconst.SERVICECHAIN)
def _create_service_profile(self, **kwargs):
"""Create service profile wrapper that can be used by drivers."""
return self.create_service_profile(**kwargs)
def _create_redirect_rule(self, spec_id):
action = self.create_policy_action(action_type='REDIRECT',
action_value=spec_id)
classifier = self.create_policy_classifier(
port_range=80, protocol='tcp', direction='bi')
rule = self.create_policy_rule(
policy_actions=[action['policy_action']['id']],
policy_classifier_id=classifier['policy_classifier']['id'])
return rule
def _create_redirect_prs(self, spec_id):
rule = self._create_redirect_rule(spec_id)['policy_rule']
prs = self.create_policy_rule_set(policy_rules=[rule['id']])
return prs
def _create_simple_service_chain(self, number_of_nodes=1,
service_type='LOADBALANCERV2'):
prof = self.create_service_profile(
service_type=service_type,
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node_ids = []
for x in range(number_of_nodes):
node_ids.append(self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']['id'])
return self._create_chain_with_nodes(node_ids)
def _create_chain_with_nodes(self, node_ids=None):
node_ids = node_ids or []
spec = self.create_servicechain_spec(
nodes=node_ids,
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})['policy_target_group']
consumer = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})['policy_target_group']
return provider, consumer, prs
def _add_node_driver(self, name):
inst = dummy_driver.NoopNodeDriver()
inst.initialize(name)
ext = mock.Mock()
ext.obj = inst
self.sc_plugin.driver_manager.ordered_drivers.append(ext)
self.sc_plugin.driver_manager.drivers[name] = ext
class NodeCompositionPluginTestCase(
test_base.BaseTestGroupPolicyPluginGroupResources,
NodeCompositionPluginTestMixin):
def setUp(self, core_plugin=None, gp_plugin=None, node_drivers=None,
node_plumber=None):
cfg.CONF.set_override(
'extension_drivers', ['proxy_group'], group='group_policy')
if node_drivers:
cfg.CONF.set_override('node_drivers', node_drivers,
group='node_composition_plugin')
cfg.CONF.set_override('node_plumber', node_plumber or 'dummy_plumber',
group='node_composition_plugin')
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'resource_mapping',
'chain_mapping'],
group='group_policy')
super(NodeCompositionPluginTestCase, self).setUp(
core_plugin=core_plugin or CORE_PLUGIN,
gp_plugin=gp_plugin or GP_PLUGIN_KLASS,
sc_plugin=SC_PLUGIN_KLASS)
self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
def _create_simple_chain(self):
node = self._create_profiled_servicechain_node(
service_type="LOADBALANCERV2",
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
action = self.create_policy_action(
action_type='REDIRECT', action_value=spec['id'])['policy_action']
classifier = self.create_policy_classifier(
direction='bi', port_range=80, protocol='tcp')['policy_classifier']
rule = self.create_policy_rule(
policy_classifier_id=classifier['id'],
policy_actions=[action['id']])['policy_rule']
prs = self.create_policy_rule_set(
policy_rules=[rule['id']])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})['policy_target_group']
consumer = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})['policy_target_group']
return provider, consumer, node
def test_spec_ordering_list_servicechain_instances(self):
pass
def test_context_attributes(self):
# Verify Context attributes for simple config
plugin_context = n_context.get_admin_context()
profile = self._create_service_profile(
service_type="LOADBALANCERV2",
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=profile['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
self.create_policy_target_group()
management = self.create_policy_target_group(
service_management=True,
is_admin_context=True)['policy_target_group']
classifier = self.create_policy_classifier()['policy_classifier']
instance = self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id='N/A',
servicechain_specs=[spec['id']], classifier_id=classifier['id'])[
'servicechain_instance']
# Verify created without errors
ctx = ncp_context.get_node_driver_context(
self.plugin, plugin_context, instance, node)
self.assertIsNotNone(ctx.gbp_plugin)
self.assertIsNotNone(ctx.sc_plugin)
self.assertIsNotNone(ctx.core_plugin)
self.assertIsNotNone(ctx.plugin_context)
self.assertIsNotNone(ctx.plugin_session)
self.assertIsNotNone(ctx.session)
self.assertIsNotNone(ctx.admin_context)
self.assertIsNotNone(ctx.admin_session)
del ctx.current_profile['nodes']
self.assertEqual(ctx.current_profile['id'], profile['id'])
self.assertEqual(instance['id'], ctx.instance['id'])
self.assertEqual(provider['id'], ctx.provider['id'])
self.assertIsNone(ctx.consumer)
self.assertEqual(management['id'], ctx.management['id'])
self.assertEqual([spec['id']], [x['id'] for x in ctx.relevant_specs])
self.assertIsNone(ctx.original_node)
self.assertEqual(0, len(ctx.get_service_targets()))
instance['provider_ptg_id'] = 'dummy-id'
ctx = ncp_context.get_node_driver_context(
self.plugin, plugin_context, instance, node)
self.assertIsNone(ctx.provider)
self.assertIsNone(ctx.consumer)
def test_context_relevant_specs(self):
plugin_context = n_context.get_admin_context()
node_used = self._create_profiled_servicechain_node(
service_type="LOADBALANCERV2",
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
spec_used = self.create_servicechain_spec(
nodes=[node_used['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
classifier = self.create_policy_classifier()['policy_classifier']
instance = self.create_servicechain_instance(
provider_ptg_id=provider['id'],
classifier_id=classifier['id'],
servicechain_specs=[spec_used['id']])['servicechain_instance']
ctx = ncp_context.get_node_driver_context(
self.plugin, plugin_context, instance, node_used)
self.assertEqual([spec_used['id']],
[x['id'] for x in ctx.relevant_specs])
def test_manager_initialized(self):
mgr = self.plugin.driver_manager
self.assertIsInstance(mgr.ordered_drivers[0].obj,
dummy_driver.NoopNodeDriver)
for driver in mgr.ordered_drivers:
self.assertTrue(driver.obj.initialized)
def test_spec_parameters(self):
"""Test that config_param_names is empty when using NCP.
In NCP the config attribute of a node may be something different than
a HEAT template, therefore config_param_names is not used.
"""
params_node_1 = ['p1', 'p2', 'p3']
params_node_2 = ['p4', 'p5', 'p6']
params_node_3 = ['p7', 'p8', 'p9']
def params_dict(params):
return jsonutils.dumps({'Parameters':
dict((x, {}) for x in params)})
prof = self._create_service_profile(
service_type='LOADBALANCERV2', shared=True,
vendor=self.SERVICE_PROFILE_VENDOR,
tenant_id='admin')['service_profile']
# Create 2 nodes with different parameters
node1 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_1),
expected_res_status=201)['servicechain_node']
node2 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_2),
expected_res_status=201)['servicechain_node']
# Create SC spec with the nodes assigned
spec = self.create_servicechain_spec(
nodes=[node1['id'], node2['id']], shared=True,
expected_res_status=201)['servicechain_spec']
# Verify param names is empty
self.assertIsNone(spec['config_param_names'])
# Update the spec removing one node
self.update_servicechain_spec(spec['id'], nodes=[node1['id']],
expected_res_status=200)
spec = self.show_servicechain_spec(spec['id'])['servicechain_spec']
# Verify param names is empty
self.assertIsNone(spec['config_param_names'])
# Update a node with new config params
self.update_servicechain_node(node1['id'],
config=params_dict(params_node_3),
expected_res_status=200)
spec = self.show_servicechain_spec(spec['id'])['servicechain_spec']
# Verify param names is empty
self.assertIsNone(spec['config_param_names'])
def test_create_service_chain(self):
deploy = self.driver.create = mock.Mock()
destroy = self.driver.delete = mock.Mock()
self._create_simple_service_chain(1)
self.assertEqual(1, deploy.call_count)
self.assertEqual(0, destroy.call_count)
deploy.reset_mock()
provider, _, _ = self._create_simple_service_chain(3)
self.assertEqual(3, deploy.call_count)
self.assertEqual(0, destroy.call_count)
self.update_policy_target_group(provider['id'],
provided_policy_rule_sets={})
self.assertEqual(3, deploy.call_count)
self.assertEqual(3, destroy.call_count)
def test_update_service_chain(self):
deploy = self.driver.create = mock.Mock()
update = self.driver.update = mock.Mock()
destroy = self.driver.delete = mock.Mock()
provider, _, prs = self._create_simple_service_chain(1)
self.assertEqual(1, deploy.call_count)
self.assertEqual(0, destroy.call_count)
# REVISIT(Magesh): When bug #1446587 is fixed, we should test by
# performing a classifier or rule update instead of SC instance update
instances = self._list('servicechain_instances')[
'servicechain_instances']
self.assertEqual(1, len(instances))
self.update_servicechain_instance(
instances[0]['id'],
expected_res_status=200)
self.assertEqual(1, update.call_count)
self.assertEqual(0, destroy.call_count)
def test_create_service_chain_fails(self):
deploy = self.driver.create = mock.Mock()
destroy = self.driver.delete = mock.Mock()
deploy.side_effect = Exception
try:
self._create_simple_service_chain(3)
except Exception:
pass
self.assertEqual(1, deploy.call_count)
self.assertEqual(3, destroy.call_count)
def test_update_node_fails(self):
validate_update = self.driver.validate_update = mock.Mock()
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
validate_update.side_effect = exc.NodeCompositionPluginBadRequest(
resource='node', msg='reason')
res = self.update_servicechain_node(node_id,
description='somethingelse',
expected_res_status=400)
self.assertEqual('NodeCompositionPluginBadRequest',
res['NeutronError']['type'])
def test_update_instantiated_profile_fails(self):
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id], expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
res = self.update_service_profile(prof['id'],
vendor='somethingelse',
expected_res_status=400)
self.assertEqual('ServiceProfileInUseByAnInstance',
res['NeutronError']['type'])
def test_second_driver_scheduled_if_first_fails(self):
self._add_node_driver('test')
drivers = [x.obj for x in
self.sc_plugin.driver_manager.ordered_drivers]
create_1 = drivers[0].validate_create = mock.Mock()
create_1.side_effect = n_exc.NeutronException()
# This happens without error
profile = self._create_service_profile(
service_type="TYPE",
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=profile['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
classifier = self.create_policy_classifier()['policy_classifier']
self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id='N/A',
servicechain_specs=[spec['id']], classifier_id=classifier['id'],
expected_res_status=201)
def test_chain_fails_if_no_drivers_available(self):
self._add_node_driver('test')
drivers = [x.obj for x in
self.sc_plugin.driver_manager.ordered_drivers]
create_1 = drivers[0].validate_create = mock.Mock()
create_1.side_effect = n_exc.NeutronException()
create_2 = drivers[1].validate_create = mock.Mock()
create_2.side_effect = n_exc.NeutronException()
profile = self._create_service_profile(
service_type="TYPE",
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=profile['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
classifier = self.create_policy_classifier()['policy_classifier']
self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id='N/A',
servicechain_specs=[spec['id']], classifier_id=classifier['id'],
expected_res_status=400)
def test_multiple_nodes_update(self):
update = self.driver.update = mock.Mock()
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
self._create_chain_with_nodes([node['id']])
self.update_servicechain_node(node['id'], name='somethingelse')
self.assertEqual(1, update.call_count)
update.reset_mock()
self._create_chain_with_nodes([node['id']])
self._create_chain_with_nodes([node['id']])
self.update_servicechain_node(node['id'], name='somethingelse')
self.assertEqual(3, update.call_count)
def test_inuse_spec_node_update_rejected(self):
prof = self.create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node1 = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
node2 = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node1['id'], node2['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
res = self.update_servicechain_spec(spec['id'],
nodes=[node1['id']],
expected_res_status=400)
self.assertEqual('InuseSpecNodeUpdateNotAllowed',
res['NeutronError']['type'])
def test_instance_update(self):
prof = self.create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node1 = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
node2 = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node1['id'], node2['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
instances = self._list('servicechain_instances')[
'servicechain_instances']
self.assertEqual(1, len(instances))
spec2 = self.create_servicechain_spec(
nodes=[node1['id']],
expected_res_status=201)['servicechain_spec']
res = self.update_servicechain_instance(
instances[0]['id'], servicechain_specs=[spec2['id']],
expected_res_status=200)
self.assertEqual([spec2['id']],
res['servicechain_instance']['servicechain_specs'])
def test_relevant_ptg_update(self):
add = self.driver.update_policy_target_added = mock.Mock()
rem = self.driver.update_policy_target_removed = mock.Mock()
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})['policy_target_group']
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
# Verify notification issued for created PT in the provider
pt = self.create_policy_target(
policy_target_group_id=provider['id'])['policy_target']
pt['port_attributes'] = {}
self.assertEqual(1, add.call_count)
add.assert_called_with(mock.ANY, pt)
del pt['port_attributes']
# Verify notification issued for deleted PT in the provider
self.delete_policy_target(pt['id'])
self.assertEqual(1, rem.call_count)
rem.assert_called_with(mock.ANY, pt)
def test_irrelevant_ptg_update(self):
add = self.driver.update_policy_target_added = mock.Mock()
rem = self.driver.update_policy_target_removed = mock.Mock()
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']], expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
other = self.create_policy_target_group()['policy_target_group']
# Verify notification issued for created PT in the provider
pt = self.create_policy_target(
policy_target_group_id=other['id'])['policy_target']
self.assertFalse(add.called)
# Verify notification issued for deleted PT in the provider
self.delete_policy_target(pt['id'])
self.assertFalse(rem.called)
def test_notify_chain_update_hook(self):
update_hook = self.driver.notify_chain_parameters_updated = mock.Mock()
prof = self.create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']],
expected_res_status=201)['servicechain_spec']
action = self.create_policy_action(action_type='REDIRECT',
action_value=spec['id'])
classifier = self.create_policy_classifier(
port_range=80, protocol='tcp', direction='bi')['policy_classifier']
rule = self.create_policy_rule(
policy_actions=[action['policy_action']['id']],
policy_classifier_id=classifier['id'])['policy_rule']
prs = self.create_policy_rule_set(
policy_rules=[rule['id']])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
instances = self._list('servicechain_instances')[
'servicechain_instances']
self.assertEqual(1, len(instances))
self.update_policy_classifier(classifier['id'], port_range=22)
update_hook.assert_called_with(mock.ANY)
def test_context_no_management(self):
# Verify Context attributes for simple config
plugin_context = n_context.get_admin_context()
plugin_context.is_admin = False
plugin_context.is_advsvc = False
plugin_context.tenant_id = 'test-tenant'
node = self._create_profiled_servicechain_node()['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
# Verify admin created SM is None
management = self.create_policy_target_group(
service_management=True, tenant_id='admin',
is_admin_context=True)['policy_target_group']
pc = self.create_policy_classifier()['policy_classifier']
instance = self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id='N/A',
servicechain_specs=[spec['id']],
classifier_id=pc['id'])['servicechain_instance']
ctx = ncp_context.get_node_driver_context(
self.plugin, plugin_context, instance, node)
self.assertIsNone(ctx.management)
self.delete_policy_target_group(management['id'],
is_admin_context=True)
shared_management = self.create_policy_target_group(
service_management=True, tenant_id='admin',
is_admin_context=True, shared=True)['policy_target_group']
instance = self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id='N/A',
servicechain_specs=[spec['id']],
classifier_id=pc['id'])['servicechain_instance']
# Now admin Service Management PTG is visible
ctx = ncp_context.get_node_driver_context(
self.plugin, plugin_context, instance, node)
self.assertEqual(shared_management['id'], ctx.management['id'])
# Private management overrides shared one
private_management = self.create_policy_target_group(
service_management=True,
is_admin_context=True)['policy_target_group']
instance = self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id='N/A',
servicechain_specs=[spec['id']],
classifier_id=pc['id'])['servicechain_instance']
ctx = ncp_context.get_node_driver_context(
self.plugin, plugin_context, instance, node)
self.assertEqual(private_management['id'], ctx.management['id'])
def test_node_drivers_notified_consumer_event(self):
add = self.driver.update_node_consumer_ptg_added = mock.Mock()
rem = self.driver.update_node_consumer_ptg_removed = mock.Mock()
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
consumer = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})['policy_target_group']
# Verify notification issued for PTG consuming
add.assert_called_with(mock.ANY, consumer)
# Verify notification issued for PTG unconsuming
consumer = self.update_policy_target_group(
consumer['id'],
consumed_policy_rule_sets={})['policy_target_group']
rem.assert_called_with(mock.ANY, consumer)
provider, consumer, prs = self._create_simple_service_chain(3)
with mock.patch.object(ncp_plugin.NodeCompositionPlugin,
"update_chains_consumer_removed") as ptg_removed:
plugin_context = n_context.get_admin_context()
self._gbp_plugin.delete_policy_target_group(
plugin_context, consumer['id'])
self.assertEqual(ptg_removed.call_count, 1)
consumer['consumed_policy_rule_sets'] = []
ptg_removed.assert_called_once_with(
mock.ANY, consumer, mock.ANY)
add.reset_mock()
rem.reset_mock()
def test_no_unrelated_chains_notified(self):
add = self.driver.update_node_consumer_ptg_added = mock.Mock()
rem = self.driver.update_node_consumer_ptg_removed = mock.Mock()
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
# This creates a chain
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
# Create a PRS and assign a consumer with no provider (hence, no chain)
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
ptg = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})['policy_target_group']
# No notification should be issued
self.assertFalse(add.called)
self.assertFalse(rem.called)
# Remove the consumer
self.update_policy_target_group(ptg['id'],
consumed_policy_rule_sets={},
expected_res_status=200)
# No notification should be issued
self.assertFalse(add.called)
self.assertFalse(rem.called)
def test_node_drivers_notified_provider_updated(self):
upd = self.driver.policy_target_group_updated = mock.Mock()
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})['policy_target_group']
# TODO(Sumit): Remove the following mocks
# once Heat node driver supports reporting status
provider['status'] = mock.ANY
provider['status_details'] = mock.ANY
# Verify notification issued for PTG consuming
upd.assert_called_with(mock.ANY, None, provider)
upd.reset_mock()
# Verify notification issued for PTG consuming
new_provider = self.update_policy_target_group(
provider['id'],
consumed_policy_rule_sets={prs['id']: ''})['policy_target_group']
upd.assert_called_with(mock.ANY, provider, new_provider)
upd.reset_mock()
class TestQuotasForServiceChain(test_base.ServiceChainPluginTestCase):
@property
def sc_plugin(self):
return directory.get_plugin(pconst.SERVICECHAIN)
def setUp(self, core_plugin=None, gp_plugin=None, node_drivers=None,
node_plumber=None):
if node_drivers:
cfg.CONF.set_override('node_drivers', node_drivers,
group='node_composition_plugin')
cfg.CONF.set_override('node_plumber', node_plumber or 'dummy_plumber',
group='node_composition_plugin')
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'resource_mapping',
'chain_mapping'],
group='group_policy')
super(TestQuotasForServiceChain, self).setUp(
core_plugin=core_plugin or CORE_PLUGIN,
gp_plugin=gp_plugin or GP_PLUGIN_KLASS,
sc_plugin=SC_PLUGIN_KLASS)
self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
cfg.CONF.set_override('quota_servicechain_node', 1,
group='QUOTAS')
cfg.CONF.set_override('quota_servicechain_spec', 1,
group='QUOTAS')
cfg.CONF.set_override('quota_servicechain_instance', 1,
group='QUOTAS')
cfg.CONF.set_override('quota_service_profile', 1,
group='QUOTAS')
def tearDown(self):
cfg.CONF.set_override('quota_servicechain_node', -1,
group='QUOTAS')
cfg.CONF.set_override('quota_servicechain_spec', -1,
group='QUOTAS')
cfg.CONF.set_override('quota_servicechain_instance', -1,
group='QUOTAS')
cfg.CONF.set_override('quota_service_profile', -1,
group='QUOTAS')
super(TestQuotasForServiceChain, self).tearDown()
def test_servicechain_node_quota(self):
self.create_servicechain_node()
self.assertRaises(webob.exc.HTTPClientError,
self.create_servicechain_node)
def test_servicechain_spec_quota(self):
self.create_servicechain_spec()
self.assertRaises(webob.exc.HTTPClientError,
self.create_servicechain_spec)
def test_servicechain_instance_quota(self):
self.create_servicechain_instance()
self.assertRaises(webob.exc.HTTPClientError,
self.create_servicechain_instance)
def test_service_profile(self):
self.create_service_profile(service_type=pconst.FIREWALL)
self.assertRaises(webob.exc.HTTPClientError,
self.create_service_profile,
service_type=pconst.FIREWALL)
def test_quota_implicit_service_instance(self):
prof = self.create_service_profile(
service_type='LOADBALANCERV2',
vendor="vendor")['service_profile']
node1_id = self.create_servicechain_node(
service_profile_id=prof['id'], config="{}",
expected_res_status=201)['servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node1_id],
expected_res_status=201)['servicechain_spec']
action = self.create_policy_action(action_type='REDIRECT',
action_value=spec['id'])
classifier = self.create_policy_classifier(
port_range=80, protocol='tcp', direction='bi')
rule = self.create_policy_rule(
policy_actions=[action['policy_action']['id']],
policy_classifier_id=classifier['policy_classifier']['id'])
prs = self.create_policy_rule_set(
policy_rules=[rule['policy_rule']['id']])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
# Second service instance creation should fail now
# sice service instance quota is 1, resulting in PTG
# creation error
self.assertRaises(webob.exc.HTTPClientError,
self.create_policy_target_group,
provided_policy_rule_sets={prs['id']: ''})

View File

@@ -1,859 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from neutron_lib.plugins import constants
from oslo_serialization import jsonutils
import webob
from gbpservice.neutron.services.servicechain.plugins.ncp import (
plugin as ncp_plugin)
from gbpservice.neutron.services.servicechain.plugins.ncp import config # noqa
from gbpservice.neutron.services.servicechain.plugins.ncp.node_drivers import (
nfp_node_driver as nfp_node_driver)
from gbpservice.neutron.tests.unit.db.grouppolicy import test_group_policy_db
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_resource_mapping as test_gp_driver)
from gbpservice.neutron.tests.unit.services.servicechain import (
base_test_servicechain_plugin as test_base)
from gbpservice.neutron.tests.unit.services.servicechain.ncp import (
test_ncp_plugin as test_ncp_plugin)
from gbpservice.nfp.orchestrator.db import nfp_db as nfp_db
SERVICE_DELETE_TIMEOUT = 15
SVC_MANAGEMENT_PTG = 'foo'
class ServiceChainNCPTestPlugin(ncp_plugin.NodeCompositionPlugin):
supported_extension_aliases = ['servicechain'] + (
test_group_policy_db.UNSUPPORTED_REQUIRED_EXTS)
path_prefix = "/servicechain"
SC_PLUGIN_KLASS = (ServiceChainNCPTestPlugin.__module__ + '.' +
ServiceChainNCPTestPlugin.__name__)
CORE_PLUGIN = test_gp_driver.CORE_PLUGIN
GP_PLUGIN_KLASS = (
"gbpservice.neutron.services.grouppolicy.plugin.GroupPolicyPlugin"
)
class NFPNodeDriverTestCase(
test_base.BaseTestGroupPolicyPluginGroupResources,
test_ncp_plugin.NodeCompositionPluginTestMixin):
DEFAULT_VPN_CONFIG_DICT = {
"heat_template_version": "2013-05-23",
"description": "Creates new vpn service",
"parameters": {
"RouterId": {
"type": "string", "description": "Router ID"
},
"Subnet": {
"type": "string", "description": "Subnet id"
},
"ClientAddressPoolCidr": {
"type": "string", "description": "Pool"
},
},
"resources": {
"SSLVPNConnection": {
"type": "OS::Neutron::SSLVPNConnection",
"properties": {
"credential_id": "",
"client_address_pool_cidr": {
"get_param": "ClientAddressPoolCidr"
},
"name": "vtun0",
"vpnservice_id": {
"get_resource": "VPNService"
},
"admin_state_up": 'true'
}
},
"VPNService": {
"type": "OS::Neutron::VPNService",
"properties": {
"router_id": {
"get_param": "RouterId"
},
"subnet_id": {
"get_param": "Subnet"
},
"admin_state_up": 'true',
"name": "VPNService"
}
}
}
}
DEFAULT_VPN_CONFIG = jsonutils.dumps(DEFAULT_VPN_CONFIG_DICT)
DEFAULT_LB_CONFIG_DICT = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"test_pool": {
"Type": "OS::Neutron::Pool",
"Properties": {
"admin_state_up": True,
"description": "Haproxy pool from teplate",
"lb_method": "ROUND_ROBIN",
"monitors": [{"Ref": "HttpHM"}],
"name": "Haproxy pool",
"protocol": "HTTP",
"subnet_id": {"Ref": "Subnet"},
"vip": {
"subnet": {"Ref": "192.168.100.0"},
"address": {"Ref": "192.168.100.2"},
"name": "Haproxy vip",
"protocol_port": 80,
"connection_limit": -1,
"admin_state_up": True,
"description": "Haproxy vip from template"
}
}
},
"test_lb": {
"Type": "OS::Neutron::LoadBalancer",
"Properties": {
"pool_id": {"Ref": "HaproxyPool"},
"protocol_port": 80
}
}
}
}
DEFAULT_LB_CONFIG = jsonutils.dumps(DEFAULT_LB_CONFIG_DICT)
DEFAULT_FW_CONFIG_DICT = {
"heat_template_version": "2013-05-23",
"resources": {
'test_fw': {
"type": "OS::Neutron::Firewall",
"properties": {
"admin_state_up": True,
"firewall_policy_id": {
"get_resource": "Firewall_policy"},
"name": "testFirewall",
"description": "test Firewall"
}
},
'test_fw_policy': {
"type": "OS::Neutron::FirewallPolicy",
"properties": {
"shared": False,
"description": "test firewall policy",
"name": "testFWPolicy",
"firewall_rules": [{
"get_resource": "Rule_1"}],
"audited": True
}
}
}
}
DEFAULT_FW_CONFIG = jsonutils.dumps(DEFAULT_FW_CONFIG_DICT)
SERVICE_PROFILE_VENDOR = 'NFP'
def _create_service_profile(self, **kwargs):
if not kwargs.get('insertion_mode'):
kwargs['insertion_mode'] = 'l3'
if not kwargs.get('service_flavor'):
if kwargs['service_type'] == 'LOADBALANCERV2':
kwargs['service_flavor'] = 'haproxy'
else:
kwargs['service_flavor'] = 'vyos'
return super(NFPNodeDriverTestCase, self)._create_service_profile(
**kwargs)
def setUp(self):
config.cfg.CONF.set_override('service_delete_timeout',
SERVICE_DELETE_TIMEOUT,
group='nfp_node_driver')
config.cfg.CONF.set_override(
'extension_drivers', ['proxy_group'], group='group_policy')
config.cfg.CONF.set_override('node_drivers', ['nfp_node_driver'],
group='node_composition_plugin')
config.cfg.CONF.set_override('node_plumber', 'stitching_plumber',
group='node_composition_plugin')
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'resource_mapping',
'chain_mapping'],
group='group_policy')
super(NFPNodeDriverTestCase, self).setUp(
core_plugin=CORE_PLUGIN,
gp_plugin=GP_PLUGIN_KLASS,
sc_plugin=SC_PLUGIN_KLASS)
def test_manager_initialized(self):
mgr = self.plugin.driver_manager
self.assertIsInstance(mgr.ordered_drivers[0].obj,
nfp_node_driver.NFPNodeDriver)
for driver in mgr.ordered_drivers:
self.assertTrue(driver.obj.initialized)
def _nfp_create_profiled_servicechain_node(
self, service_type=constants.LOADBALANCERV2, shared_profile=False,
profile_tenant_id=None, profile_id=None,
service_flavor=None, **kwargs):
if not profile_id:
prof = self.create_service_profile(
service_type=service_type,
shared=shared_profile,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3', service_flavor='haproxy',
tenant_id=profile_tenant_id or self._tenant_id)[
'service_profile']
else:
prof = self.get_service_profile(profile_id)
service_config = kwargs.get('config')
if not service_config or service_config == '{}':
if service_type == constants.FIREWALL:
kwargs['config'] = self.DEFAULT_FW_CONFIG
else:
kwargs['config'] = self.DEFAULT_LB_CONFIG
return self.create_servicechain_node(
service_profile_id=prof['id'], **kwargs)
def _create_simple_fw_service_chain(self, number_of_nodes=1,
service_type='FIREWALL'):
prof = self.create_service_profile(
service_type=service_type,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3', service_flavor='vyos')['service_profile']
node_ids = []
for x in range(number_of_nodes):
node_ids.append(self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node']['id'])
return self._nfp_create_chain_with_nodes(node_ids)
def _nfp_create_chain_with_nodes(self, node_ids=None):
node_ids = node_ids or []
spec = self.create_servicechain_spec(
nodes=node_ids,
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})['policy_target_group']
with mock.patch.object(nfp_node_driver.NFPClientApi,
"consumer_ptg_added_notification") as ptg_added:
consumer = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
ptg_added.assert_called_once_with(mock.ANY,
mock.ANY, mock.ANY)
return provider, consumer, prs
def test_spec_parameters(self):
pass
def test_spec_ordering_list_servicechain_instances(self):
pass
class DummyMap(object):
network_function_id = '12'
status = 'UP'
class TestServiceChainInstance(NFPNodeDriverTestCase):
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_node_create(self, plumbing_info):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
"get_network_function") as get_nf:
create_nf.return_value = {
'id': '126231632163'
}
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'gateway'
}
self._create_simple_fw_service_chain()
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
def _test_node_update(self):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
"get_network_function") as get_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
"update_service_config") as update_svc_config:
create_nf.return_value = {
'id': '126231632163'
}
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
prof = self.create_service_profile(
service_type=constants.FIREWALL,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
self.create_policy_target_group(
name='foo')['policy_target_group']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node']
self._nfp_create_chain_with_nodes(node_ids=[node['id']])
self.update_servicechain_node(
node['id'],
name='newname',
expected_res_status=200)
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
update_svc_config.assert_called_once_with()
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_node_delete(self, plumbing_info):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
create_nf.return_value = {
'id': '126231632163'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'gateway'
}
prof = self.create_service_profile(
service_type=constants.FIREWALL,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node'][
'id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
with mock.patch.object(nfp_node_driver.NFPClientApi,
"get_network_function") as get_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
"delete_network_function") as delete_nf,\
mock.patch.object(nfp_db.NFPDbBase,
"get_node_instance_network_function_map") as get_map,\
mock.patch.object(nfp_db.NFPDbBase,
"update_node_instance_network_function_map") as update_map:
get_map.return_value = DummyMap()
update_map.return_value = mock.ANY
get_nf.return_value = None
self.delete_policy_target_group(
provider['id'], expected_res_status=204)
delete_nf.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_wait_for_network_function_delete_completion(self, plumbing_info):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
create_nf.return_value = {
'id': '126231632163'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'gateway'
}
prof = self.create_service_profile(
service_type=constants.FIREWALL,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node'][
'id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
with mock.patch.object(nfp_node_driver.NFPClientApi,
'delete_network_function') as delete_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf,\
mock.patch.object(nfp_db.NFPDbBase,
"get_node_instance_network_function_map") as get_map,\
mock.patch.object(nfp_db.NFPDbBase,
"update_node_instance_network_function_map") as \
update_map:
get_map.return_value = DummyMap()
update_map.return_value = mock.ANY
delete_nf.return_value = None
get_nf.return_value = None
# Removing the PRSs will make the PTG deletable again
self.update_policy_target_group(
provider['id'],
provided_policy_rule_sets={},
expected_res_status=200)
self.delete_policy_target_group(provider['id'],
expected_res_status=204)
delete_nf.assert_called_once_with(mock.ANY, mock.ANY,
mock.ANY)
def _create_policy_target_port(self, policy_target_group_id):
pt = self.create_policy_target(
policy_target_group_id=policy_target_group_id)['policy_target']
req = self.new_show_request('ports', pt['port_id'], fmt=self.fmt)
port = self.deserialize(self.fmt,
req.get_response(self.api))['port']
return (pt, port)
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_lb_node_create(self, plumbing_info, consumer_external=False):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
create_nf.return_value = {
'id': '126231632163'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'endpoint'
}
node_id = self._nfp_create_profiled_servicechain_node(
service_type=constants.LOADBALANCERV2)[
'servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
params = [{'type': 'ip_single', 'name': 'vip_ip',
'value': 'self_subnet'}]
nsp = self.create_network_service_policy(
network_service_params=params)
network_service_policy_id = nsp['network_service_policy']['id']
provider = self.create_policy_target_group(
network_service_policy_id=network_service_policy_id,
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
with mock.patch.object(nfp_node_driver.NFPClientApi,
"policy_target_added_notification") as pt_added:
# Verify notification issued for created PT in the provider
_, port = self._create_policy_target_port(provider['id'])
pt_added.assert_called_once_with(mock.ANY, mock.ANY,
mock.ANY)
if consumer_external:
self._create_external_policy(prs['id'])
else:
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
def test_invalid_service_type_rejected(self):
node_used = self._nfp_create_profiled_servicechain_node(
service_type="test")['servicechain_node']
spec_used = self.create_servicechain_spec(
nodes=[node_used['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
classifier = self.create_policy_classifier()['policy_classifier']
res = self.create_servicechain_instance(
provider_ptg_id=provider['id'],
classifier_id=classifier['id'],
servicechain_specs=[spec_used['id']],
expected_res_status=webob.exc.HTTPBadRequest.code)
self.assertEqual('NoDriverAvailableForAction',
res['NeutronError']['type'])
def test_is_node_order_in_spec_supported(self):
lb_prof = self.create_service_profile(
service_type=constants.LOADBALANCERV2,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='haproxy')['service_profile']
vpn_prof = self.create_service_profile(
service_type=constants.VPN,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
vpn_node = self.create_servicechain_node(
service_profile_id=vpn_prof['id'],
config=self.DEFAULT_VPN_CONFIG,
expected_res_status=201)['servicechain_node']
lb_node = self.create_servicechain_node(
service_profile_id=lb_prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
node_ids = [lb_node['id'], vpn_node['id']]
spec = self.create_servicechain_spec(
nodes=node_ids,
expected_res_status=201)['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
classifier = self.create_policy_classifier()['policy_classifier']
res = self.create_servicechain_instance(
provider_ptg_id=provider['id'],
classifier_id=classifier['id'],
servicechain_specs=[spec['id']],
expected_res_status=webob.exc.HTTPBadRequest.code)
self.assertEqual('NoDriverAvailableForAction',
res['NeutronError']['type'])
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_validate_update(self, plumbing_info):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
"get_network_function") as get_nf:
create_nf.return_value = {
'id': '126231632163'
}
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'gateway'
}
fw_prof = self.create_service_profile(
service_type=constants.FIREWALL,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
fw_node = self.create_servicechain_node(
service_profile_id=fw_prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node']
node_ids = [fw_node['id']]
spec = self.create_servicechain_spec(
nodes=node_ids,
expected_res_status=201)['servicechain_spec']
provider = self.create_policy_target_group()[
'policy_target_group']
classifier = self.create_policy_classifier()[
'policy_classifier']
servicechain_instance = self.create_servicechain_instance(
provider_ptg_id=provider['id'],
classifier_id=classifier['id'],
servicechain_specs=[spec['id']])[
'servicechain_instance']
fw_prof = self.create_service_profile(
service_type='test',
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
fw_node = self.create_servicechain_node(
service_profile_id=fw_prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node']
node_ids = [fw_node['id']]
spec = self.create_servicechain_spec(
nodes=node_ids,
expected_res_status=201)['servicechain_spec']
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
with mock.patch.object(nfp_node_driver.NFPClientApi,
"get_network_function") as get_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
"delete_network_function") as delete_nf,\
mock.patch.object(nfp_db.NFPDbBase,
"get_node_instance_network_function_map") as get_map,\
mock.patch.object(nfp_db.NFPDbBase,
"update_node_instance_network_function_map") as \
update_map:
get_map.return_value = DummyMap()
update_map.return_value = mock.ANY
get_nf.return_value = None
res = self.update_servicechain_instance(
servicechain_instance['id'],
servicechain_specs=[spec['id']],
expected_res_status=webob.exc.HTTPBadRequest.code)
delete_nf.assert_called_once_with(mock.ANY,
mock.ANY, mock.ANY)
self.assertEqual('NoDriverAvailableForAction',
res['NeutronError']['type'])
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_update_node_consumer_ptg_added(self, plumbing_info):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
create_nf.return_value = {
'id': '126231632163'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'gateway'
}
prof = self.create_service_profile(
service_type=constants.FIREWALL,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node'][
'id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
with mock.patch.object(nfp_node_driver.NFPClientApi,
"consumer_ptg_added_notification") as ptg_added:
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
ptg_added.assert_called_once_with(mock.ANY,
mock.ANY, mock.ANY)
def _test_update_node_consumer_ptg_removed(self):
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
create_nf.return_value = {
'id': '126231632163'
}
prof = self.create_service_profile(
service_type=constants.FIREWALL,
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3',
service_flavor='vyos')['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node'][
'id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
with mock.patch.object(nfp_node_driver.NFPClientApi,
"consumer_ptg_added_notification") as ptg_added:
consumer = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
ptg_added.assert_called_once_with(mock.ANY, mock.ANY,
mock.ANY)
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
with mock.patch.object(nfp_node_driver.NFPClientApi,
"consumer_ptg_removed_notification") as ptg_removed:
self.delete_policy_target_group(
consumer['id'], expected_res_status=204)
ptg_removed.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_policy_target_add_remove(self, plumbing_info):
prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3', service_flavor='haproxy')['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_LB_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
create_nf.return_value = {
'id': '126231632163'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'endpoint'
}
params = [{'type': 'ip_single', 'name': 'vip_ip',
'value': 'self_subnet'}]
nsp = self.create_network_service_policy(
network_service_params=params)
network_service_policy_id = nsp['network_service_policy'][
'id']
provider = self.create_policy_target_group(
network_service_policy_id=network_service_policy_id,
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
with mock.patch.object(nfp_node_driver.NFPClientApi,
"policy_target_added_notification") as pt_added:
# Verify notification issued for created PT in the provider
pt = self.create_policy_target(
policy_target_group_id=provider['id'])[
'policy_target']
create_nf.assert_called_once_with(mock.ANY, mock.ANY)
pt_added.assert_called_once_with(mock.ANY, mock.ANY,
mock.ANY)
# Verify notification issued for deleted PT in the provider
with mock.patch.object(nfp_node_driver.NFPClientApi,
"policy_target_removed_notification") as pt_removed:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
self.delete_policy_target(pt['id'])
pt_removed.assert_called_once_with(mock.ANY, mock.ANY,
mock.ANY)
@mock.patch.object(nfp_node_driver.NFPClientApi, 'get_plumbing_info')
def test_policy_target_group_updated(self, plumbing_info):
prof = self._create_service_profile(
service_type='FIREWALL',
vendor=self.SERVICE_PROFILE_VENDOR,
insertion_mode='l3', service_flavor='vyos')['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
config=self.DEFAULT_FW_CONFIG,
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
action = self.create_policy_action(
action_type='REDIRECT', action_value=spec['id'])[
'policy_action']
classifier = self.create_policy_classifier(
direction='bi', protocol='icmp')[
'policy_classifier']
rule = self.create_policy_rule(
policy_classifier_id=classifier['id'],
policy_actions=[action['id']])['policy_rule']
prs = self.create_policy_rule_set(
policy_rules=[rule['id']])['policy_rule_set']
# allow
allow_action = self.create_policy_action(action_type='ALLOW')[
'policy_action']
allow_rule = self.create_policy_rule(
policy_classifier_id=classifier['id'],
policy_actions=[allow_action['id']])['policy_rule']
allow_prs = self.create_policy_rule_set(
policy_rules=[allow_rule['id']])['policy_rule_set']
# ref ptg
ref_ptg = self.create_policy_target_group()['policy_target_group']
ref_pt = self.create_policy_target(
policy_target_group_id=ref_ptg['id'])['policy_target']
with mock.patch.object(nfp_node_driver.NFPClientApi,
"create_network_function") as create_nf:
with mock.patch.object(nfp_node_driver.NFPClientApi,
'get_network_function') as get_nf:
get_nf.return_value = {
'id': '126231632163',
'status': 'ACTIVE'
}
create_nf.return_value = {
'id': '126231632163'
}
plumbing_info.return_value = {
'management': [],
'provider': [{}],
'consumer': [{}],
'plumbing_type': 'gateway'
}
orig_ptg = self.create_policy_target_group(
description="opflex_eoc:%s" % ref_pt['port_id'],
provided_policy_rule_sets={prs['id']: ''})[
'policy_target_group']
current_ptg = self.update_policy_target_group(
orig_ptg['id'],
provided_policy_rule_sets={
prs['id']: '', allow_prs['id']: ''})[
'policy_target_group']
ref_ptg = self.show_policy_target_group(ref_ptg['id'])[
'policy_target_group']
self.assertSetEqual(set(ref_ptg['provided_policy_rule_sets']),
set(current_ptg[
'provided_policy_rule_sets']))

View File

@@ -1,168 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from neutron.common import config # noqa
from neutron_lib import context as n_context
from neutron_lib.plugins import constants as pconst
from oslo_config import cfg
from gbpservice.neutron.services.servicechain.plugins.ncp import model
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_resource_mapping as test_gp_driver)
from gbpservice.neutron.tests.unit.services.servicechain.ncp import (
test_ncp_plugin as base)
class TrafficStitchingPlumberTestCase(base.NodeCompositionPluginTestCase):
def setUp(self):
cfg.CONF.set_override('policy_drivers', ['implicit_policy',
'resource_mapping'],
group='group_policy')
cfg.CONF.set_override('allow_overlapping_ips', True)
cfg.CONF.set_override(
'extension_drivers', ['proxy_group'], group='group_policy')
super(TrafficStitchingPlumberTestCase, self).setUp(
node_drivers=['node_dummy'], node_plumber='stitching_plumber',
core_plugin=test_gp_driver.CORE_PLUGIN)
self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
self.driver.get_plumbing_info = mock.Mock()
self.driver.get_plumbing_info.return_value = {}
def test_one_gateway_pt_prov_cons(self):
context = n_context.get_admin_context()
self.driver.get_plumbing_info.return_value = {
'provider': [{}], 'consumer': [{}], 'plumbing_type': 'gateway'}
provider, consumer, node = self._create_simple_chain()
provider = self.show_policy_target_group(
provider['id'])['policy_target_group']
# Verify Service PT created and correctly placed
targets = model.get_service_targets(context.session)
self.assertEqual(2, len(targets))
old_relationship = None
for target in targets:
self.assertEqual(node['id'], target.servicechain_node_id)
pt = self.show_policy_target(
target.policy_target_id)['policy_target']
if target.relationship == 'provider':
self.assertEqual(provider['id'],
pt['policy_target_group_id'])
self.assertTrue(pt['group_default_gateway'])
self.assertFalse(pt['proxy_gateway'])
else:
# Consumer side a proxy group exists
self.assertEqual(provider['proxy_group_id'],
pt['policy_target_group_id'])
self.assertFalse(pt['group_default_gateway'])
self.assertTrue(pt['proxy_gateway'])
self.assertNotEqual(old_relationship, target.relationship)
old_relationship = target.relationship
port = self._get_object('ports', pt['port_id'], self.api)['port']
self.assertTrue(port['name'].startswith('pt_service_target_'),
"Port name doesn't start with 'pt_service_target_"
"'.\nport:\n%s\n" % port)
self.update_policy_target_group(
provider['id'], provided_policy_rule_sets={})
# With chain deletion, also the Service PTs are deleted
new_targets = model.get_service_targets(context.session)
self.assertEqual(0, len(new_targets))
for target in targets:
self.show_policy_target(
target.policy_target_id, expected_res_status=404)
provider = self.show_policy_target_group(
provider['id'])['policy_target_group']
self.assertIsNone(provider['proxy_group_id'])
def test_multiple_endpoint_pt_provider(self):
context = n_context.get_admin_context()
self.driver.get_plumbing_info.return_value = {
'provider': [{}, {}], 'consumer': [], 'plumbing_type': 'endpoint'}
provider, consumer, node = self._create_simple_chain()
provider = self.show_policy_target_group(
provider['id'])['policy_target_group']
# Verify Service PT created and contains proper name, description
targets = model.get_service_targets(context.session)
self.assertEqual(2, len(targets))
for target in targets:
pt = self.show_policy_target(
target.policy_target_id)['policy_target']
self.assertEqual(provider['id'],
pt['policy_target_group_id'])
self.assertTrue(pt['name'].startswith('tscp_endpoint_service'),
"Policy Target name doesn't start with "
"'tscp_endpoint_service'.\npt:\n%s\n" % pt)
self.assertTrue(node['id'] in pt['description'],
"Policy Target description doesn't contains "
" node id.\nnode:\n%s\n" % node)
port = self._get_object('ports', pt['port_id'], self.api)['port']
self.assertTrue(port['name'].startswith(
'pt_tscp_endpoint_service'),
"Port name doesn't start with "
"'pt_tscp_endpoint_service'.\nport:\n%s\n" % port)
self.update_policy_target_group(
provider['id'], provided_policy_rule_sets={})
# With chain deletion, also the Service PTs are deleted
new_targets = model.get_service_targets(context.session)
self.assertEqual(0, len(new_targets))
for target in targets:
self.show_policy_target(
target.policy_target_id, expected_res_status=404)
provider = self.show_policy_target_group(
provider['id'])['policy_target_group']
self.assertIsNone(provider['proxy_group_id'])
def get_plumbing_info_base(self, context):
service_type = context.current_profile['service_type']
plumbing_request = {'management': [], 'provider': [{}],
'consumer': [{}]}
if service_type in [pconst.FIREWALL]:
plumbing_request['plumbing_type'] = 'gateway'
else:
plumbing_request = {}
return plumbing_request
def test_get_service_targets_in_chain(self):
context = n_context.get_admin_context()
self.driver.get_plumbing_info = self.get_plumbing_info_base
lb_prof = self._create_service_profile(
service_type='LOADBALANCERV2',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
lb_node = self.create_servicechain_node(
service_profile_id=lb_prof['id'],
config=self.DEFAULT_LB_CONFIG)['servicechain_node']
fw_prof = self._create_service_profile(
service_type='FIREWALL',
vendor=self.SERVICE_PROFILE_VENDOR)['service_profile']
fw_node = self.create_servicechain_node(
service_profile_id=fw_prof['id'],
config='{}')['servicechain_node']
self._create_chain_with_nodes([fw_node['id'], lb_node['id']])
targets = model.get_service_targets(context.session)
self.assertEqual(2, len(targets))
def test_ptg_delete(self):
self.driver.get_plumbing_info.return_value = {
'provider': [{}], 'consumer': [{}],
'plumbing_type': 'transparent'}
provider, _, _ = self._create_simple_service_chain()
# Deleting a PTG will fail because of existing PTs
self.delete_policy_target_group(provider['id'],
expected_res_status=204)

View File

@@ -1,203 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from neutron.common import config # noqa
from neutron_lib import context as n_context
from neutron_lib.plugins import directory
from oslo_config import cfg
from gbpservice.neutron.services.grouppolicy import (
policy_driver_manager as pdm)
from gbpservice.neutron.services.servicechain.plugins.ncp import model
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_resource_mapping as test_gp_driver)
from gbpservice.neutron.tests.unit.services.servicechain.ncp import (
test_ncp_plugin as base)
GATEWAY = 'gateway'
GATEWAY_HA = 'gateway_ha'
TRANSPARENT = 'transparent'
ENDPOINT = 'endpoint'
info_mapping = {
GATEWAY: {'plumbing_type': GATEWAY, 'provider': [{}], 'consumer': [{}]},
GATEWAY_HA: {'plumbing_type': GATEWAY, 'provider': [{}, {}, {}],
'consumer': [{}, {}, {}]},
TRANSPARENT: {'plumbing_type': TRANSPARENT, 'provider': [{}],
'consumer': [{}]},
ENDPOINT: {'plumbing_type': ENDPOINT, 'provider': [{}]},
}
info_mapping['FIREWALL'] = info_mapping[GATEWAY]
info_mapping['FIREWALL_HA'] = info_mapping[GATEWAY_HA]
info_mapping['TRANSPARENT_FIREWALL'] = info_mapping[TRANSPARENT]
info_mapping['LOADBALANCERV2'] = info_mapping[ENDPOINT]
class ResourceMappingStitchingPlumberGBPTestCase(
test_gp_driver.ResourceMappingTestCase):
def setUp(self):
cfg.CONF.set_override(
'extension_drivers', ['proxy_group'], group='group_policy')
cfg.CONF.set_override('node_plumber', 'stitching_plumber',
group='node_composition_plugin')
ml2_opts = {'mechanism_drivers': ['stitching_gbp'],
'extension_drivers': ['qos']}
host_agents = mock.patch('neutron.plugins.ml2.driver_context.'
'PortContext.host_agents').start()
host_agents.return_value = [self.agent_conf]
qos_plugin = 'qos'
super(ResourceMappingStitchingPlumberGBPTestCase, self).setUp(
sc_plugin=base.SC_PLUGIN_KLASS, ml2_options=ml2_opts,
qos_plugin=qos_plugin)
def get_plumbing_info(context):
return info_mapping.get(context.current_profile['service_type'])
self.node_driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
self.node_driver.get_plumbing_info = get_plumbing_info
pdm.PolicyDriverManager.get_policy_target_group_status = (
mock.MagicMock({}))
@property
def sc_plugin(self):
return directory.get_plugin('SERVICECHAIN')
class TestPolicyRuleSet(ResourceMappingStitchingPlumberGBPTestCase,
test_gp_driver.TestPolicyRuleSet):
pass
class TestServiceChain(ResourceMappingStitchingPlumberGBPTestCase,
test_gp_driver.TestServiceChain):
def test_parent_ruleset_update_for_redirect(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_enforce_parent_redirect_after_ptg_create(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_hierarchical_redirect(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_redirect_multiple_ptgs_single_prs(self):
# REVISIT(ivar): This test is doing a mock patching that breaks the
# workflow
pass
def test_action_spec_value_update(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_rule_update_hierarchial_prs(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_rule_update_updates_chain(self):
# NCP doesn't support multiple SPECs per instance
pass
class TestServiceChainAdminOwner(ResourceMappingStitchingPlumberGBPTestCase,
test_gp_driver.TestServiceChainAdminOwner):
def test_parent_ruleset_update_for_redirect(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_enforce_parent_redirect_after_ptg_create(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_hierarchical_redirect(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_redirect_multiple_ptgs_single_prs(self):
# REVISIT(ivar): This test is doing a mock patching that breaks the
# workflow
pass
def test_action_spec_value_update(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_rule_update_hierarchial_prs(self):
# NCP doesn't support multiple SPECs per instance
pass
def test_rule_update_updates_chain(self):
# NCP doesn't support multiple SPECs per instance
pass
class TestPolicyAction(ResourceMappingStitchingPlumberGBPTestCase,
test_gp_driver.TestPolicyAction):
pass
class TestPolicyRule(ResourceMappingStitchingPlumberGBPTestCase,
test_gp_driver.TestPolicyRule):
pass
class TestExternalSegment(ResourceMappingStitchingPlumberGBPTestCase,
test_gp_driver.TestExternalSegment):
def test_update(self):
super(TestExternalSegment, self).test_update(
proxy_ip_pool1='182.169.0.0/16',
proxy_ip_pool2='172.169.0.0/16')
class TestExternalPolicy(ResourceMappingStitchingPlumberGBPTestCase,
test_gp_driver.TestExternalPolicy):
pass
class TestImplicitServiceChains(ResourceMappingStitchingPlumberGBPTestCase,
base.NodeCompositionPluginTestMixin):
def test_service_targets_vif_details(self):
context = n_context.get_admin_context()
self._create_simple_service_chain(service_type='TRANSPARENT_FIREWALL')
targets = model.get_service_targets(context.session)
self.assertGreater(len(targets), 0)
for target in targets:
pt = self.show_policy_target(
target.policy_target_id)['policy_target']
# Being service targets, port filter and hybrid plug will be false
port = self._bind_port_to_host(pt['port_id'], 'host')['port']
self.assertFalse(port['binding:vif_details']['port_filter'])
self.assertFalse(port['binding:vif_details']['ovs_hybrid_plug'])
def test_endpoint_target_vif_details(self):
context = n_context.get_admin_context()
self._create_simple_service_chain(service_type='LOADBALANCERV2')
targets = model.get_service_targets(context.session)
self.assertGreater(len(targets), 0)
for target in targets:
pt = self.show_policy_target(
target.policy_target_id)['policy_target']
port = self._bind_port_to_host(pt['port_id'], 'host')['port']
self.assertTrue(port['binding:vif_details']['port_filter'])
# This change sets hybrid VIF plugging to True by default again
# https://github.com/openstack/neutron/commit/
# eca893be5b770c41cfc570dc016a41c30c2cdf23
self.assertTrue(port['binding:vif_details']['ovs_hybrid_plug'])