Apic driver improvements

- Introducing Opflex agent;
- Multiple PTGs per L2Ps;
- Multiple PTGs same subnet.

Change-Id: I5dcb12daed669131270e589a01cee432b53cd61c
This commit is contained in:
Ivar Lazzaro
2015-03-24 18:15:40 -07:00
parent 86f9190d84
commit 65489bbae4
10 changed files with 1336 additions and 449 deletions

View File

@@ -39,4 +39,4 @@ class DictClass(dict):
return self[item]
__setattr__ = dict.__setattr__
__delattr__ = dict.__delattr__
__delattr__ = dict.__delattr__

View File

@@ -13,18 +13,51 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants as n_constants
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import mech_agent
from opflexagent import constants as ofcst
from oslo_log import log
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping as amap)
LOG = log.getLogger(__name__)
class APICMechanismGBPDriver(api.MechanismDriver):
class APICMechanismGBPDriver(mech_agent.AgentMechanismDriverBase):
def __init__(self):
self.vif_details = {portbindings.CAP_PORT_FILTER: False,
portbindings.OVS_HYBRID_PLUG: False}
self.vif_type = portbindings.VIF_TYPE_OVS
super(APICMechanismGBPDriver, self).__init__(
ofcst.AGENT_TYPE_OPFLEX_OVS)
def try_to_bind_segment_for_agent(self, context, segment, agent):
if self.check_segment_for_agent(segment, agent):
context.set_binding(
segment[api.ID], self.vif_type, self.vif_details)
return True
else:
return False
def check_segment_for_agent(self, segment, agent):
network_type = segment[api.NETWORK_TYPE]
if network_type == ofcst.TYPE_OPFLEX:
opflex_mappings = agent['configurations'].get('opflex_networks')
LOG.debug(_("Checking segment: %(segment)s "
"for mappings: %(mappings)s "),
{'segment': segment, 'mappings': opflex_mappings})
return ((opflex_mappings is None) or
(segment[api.PHYSICAL_NETWORK] in opflex_mappings))
else:
return False
def initialize(self):
super(APICMechanismGBPDriver, self).initialize()
self._apic_gbp = None
@property
@@ -36,16 +69,30 @@ class APICMechanismGBPDriver(api.MechanismDriver):
return self._apic_gbp
def create_port_postcommit(self, context):
# DHCP Ports are created implicitly by Neutron, need to inform GBP
if (context.current.get('device_owner') ==
n_constants.DEVICE_OWNER_DHCP):
self.apic_gbp.create_dhcp_policy_target_if_needed(
context._plugin_context, context.current)
self.apic_gbp.process_port_added(
context._plugin_context, context.current)
def update_port_postcommit(self, context):
self.apic_gbp.process_port_changed(context._plugin_context,
context.original, context.current)
def delete_port_precommit(self, context):
self.apic_gbp.process_pre_port_deleted(context._plugin_context,
context.current)
def delete_port_postcommit(self, context):
self.apic_gbp.process_port_deleted(context._plugin_context,
context.current)
def update_subnet_postcommit(self, context):
self.apic_gbp.process_subnet_changed(context._plugin_context,
context.original, context.current)
def create_subnet_postcommit(self, context):
if not context.current['name'].startswith(amap.APIC_OWNED):
self.apic_gbp.process_subnet_added(context._plugin_context,
context.current)
def delete_subnet_postcommit(self, context):
self.apic_gbp.process_subnet_deleted(context._plugin_context,
context.current)

View File

@@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import auth as ks_auth
from keystoneclient import session as ks_session
from neutron.notifiers import nova as n_nova
from novaclient import client as nclient
from novaclient import exceptions as nova_exceptions
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class NovaClient:
def __init__(self):
auth = ks_auth.load_from_conf_options(cfg.CONF, 'nova')
endpoint_override = None
if not auth:
if cfg.CONF.nova_admin_tenant_id:
endpoint_override = "%s/%s" % (cfg.CONF.nova_url,
cfg.CONF.nova_admin_tenant_id)
auth = n_nova.DefaultAuthPlugin(
auth_url=cfg.CONF.nova_admin_auth_url,
username=cfg.CONF.nova_admin_username,
password=cfg.CONF.nova_admin_password,
tenant_id=cfg.CONF.nova_admin_tenant_id,
tenant_name=cfg.CONF.nova_admin_tenant_name,
endpoint_override=endpoint_override)
session = ks_session.Session.load_from_conf_options(
cfg.CONF, 'nova', auth=auth)
novaclient_cls = nclient.get_client_class(n_nova.NOVA_API_VERSION)
self.nclient = novaclient_cls(
session=session,
region_name=cfg.CONF.nova.region_name)
def get_server(self, server_id):
try:
return self.client.servers.get(server_id)
except nova_exceptions.NotFound:
LOG.warning(_("Nova returned NotFound for server: %s"),
server_id)
except Exception as e:
LOG.exception(e)

View File

@@ -95,6 +95,10 @@ class PolicyTargetGroupContext(GroupPolicyContext,
self._plugin_context, self._policy_target_group['id'], subnet_id)
self._policy_target_group['subnets'] = subnets
def add_subnets(self, subnet_ids):
for subnet_id in subnet_ids:
self.add_subnet(subnet_id)
class L2PolicyContext(GroupPolicyContext, api.L2PolicyContext):

View File

@@ -18,6 +18,7 @@ import mock
import netaddr
import webob.exc
from neutron.agent import securitygroups_rpc as sg_cfg
from neutron.common import rpc as n_rpc
from neutron import context
from neutron.db import api as db_api
@@ -26,16 +27,18 @@ from neutron import manager
from neutron.tests.unit.plugins.ml2.drivers.cisco.apic import (
base as mocked)
from neutron.tests.unit.plugins.ml2 import test_plugin
from opflexagent import constants as ocst
from oslo_config import cfg
sys.modules["apicapi"] = mock.Mock()
from gbpservice.neutron.services.grouppolicy import (
group_policy_context as p_context)
from gbpservice.neutron.services.grouppolicy import config
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
apic_mapping as amap)
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_grouppolicy_plugin as test_gp_plugin)
test_resource_mapping as test_rmd)
APIC_L2_POLICY = 'l2_policy'
APIC_L3_POLICY = 'l3_policy'
@@ -45,9 +48,15 @@ APIC_POLICY_RULE = 'policy_rule'
APIC_EXTERNAL_RID = '1.0.0.1'
AGENT_TYPE = ocst.AGENT_TYPE_OPFLEX_OVS
AGENT_CONF = {'alive': True, 'binary': 'somebinary',
'topic': 'sometopic', 'agent_type': AGENT_TYPE,
'configurations': {'opflex_networks': None,
'bridge_mappings': {'physnet1': 'br-eth1'}}}
def echo(context, string):
return string
def echo(context, string, prefix=''):
return prefix + string
class MockCallRecorder(mock.Mock):
@@ -65,23 +74,35 @@ class MockCallRecorder(mock.Mock):
class ApicMappingTestCase(
test_gp_plugin.GroupPolicyPluginTestCase,
test_rmd.ResourceMappingTestCase,
mocked.ControllerMixin, mocked.ConfigMixin):
def setUp(self):
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'apic'],
group='group_policy')
cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
config.cfg.CONF.set_override('enable_security_group', False,
group='SECURITYGROUP')
n_rpc.create_connection = mock.Mock()
amap.ApicMappingDriver.get_apic_manager = mock.Mock()
self.set_up_mocks()
ml2_opts = {
'mechanism_drivers': ['openvswitch', 'apic_gbp'],
'mechanism_drivers': ['apic_gbp'],
'type_drivers': ['opflex'],
'tenant_network_types': ['opflex']
}
for opt, val in ml2_opts.items():
cfg.CONF.set_override(opt, val, 'ml2')
mock.patch('gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.apic_mapping.ApicMappingDriver._setup_rpc').start()
host_agents = mock.patch('neutron.plugins.ml2.driver_context.'
'PortContext.host_agents').start()
host_agents.return_value = [AGENT_CONF]
nova_client = mock.patch(
'gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.nova_client.NovaClient.get_server').start()
vm = mock.Mock()
vm.name = 'someid'
nova_client.return_value = vm
super(ApicMappingTestCase, self).setUp(
core_plugin=test_plugin.PLUGIN_NAME)
policy_drivers=['implicit_policy', 'apic'],
core_plugin=test_plugin.PLUGIN_NAME, ml2_options=ml2_opts)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
plugin = manager.NeutronManager.get_plugin()
@@ -103,6 +124,7 @@ class ApicMappingTestCase(
self.driver.apic_manager = mock.Mock(name_mapper=mock.Mock(),
ext_net_dict={})
self.driver.apic_manager.apic.transaction = self.fake_transaction
self.driver.notifier = mock.Mock()
amap.apic_manager.TENANT_COMMON = 'common'
self.common_tenant = amap.apic_manager.TENANT_COMMON
@@ -127,34 +149,40 @@ class ApicMappingTestCase(
msg='Call not found, expected:\n%s\nobserved:'
'\n%s' % (str(call), str(observed)))
observed.remove(call)
self.assertFalse(len(observed))
self.assertFalse(
len(observed),
msg='There are more calls than expected: %s' % str(observed))
def _create_simple_policy_rule(self, direction='bi', protocol='tcp',
port_range=80, shared=False,
action_type='allow', action_value=None):
cls = self.create_policy_classifier(
direction=direction, protocol=protocol,
port_range=port_range, shared=shared)['policy_classifier']
action = self.create_policy_action(
action_type=action_type, shared=shared,
action_value=action_value)['policy_action']
return self.create_policy_rule(
policy_classifier_id=cls['id'], policy_actions=[action['id']],
shared=shared)['policy_rule']
def _bind_port_to_host(self, port_id, host):
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
agent = {'host': host}
agent.update(AGENT_CONF)
plugin.create_or_update_agent(ctx, agent)
data = {'port': {'binding:host_id': host, 'device_owner': 'compute:',
'device_id': 'someid'}}
# Create EP with bound port
req = self.new_update_request('ports', data, port_id,
self.fmt)
return self.deserialize(self.fmt, req.get_response(self.api))
class TestPolicyTarget(ApicMappingTestCase):
def test_policy_target_created_on_apic(self):
ptg = self.create_policy_target_group()['policy_target_group']
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
with self.port(subnet=subnet) as port:
self._bind_port_to_host(port['port']['id'], 'h1')
self.create_policy_target(policy_target_group_id=ptg['id'],
port_id=port['port']['id'])
mgr = self.driver.apic_manager
self.assertEqual(mgr.ensure_path_created_for_port.call_count, 1)
def test_policy_target_port_update_on_apic_none_to_host(self):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
pt = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
port = self._get_object('ports', pt['port_id'], self.api)
port_up = self._bind_port_to_host(port['port']['id'], 'h1')
self.driver.process_port_changed(context.get_admin_context(),
port['port'], port_up['port'])
mgr = self.driver.apic_manager
self.assertEqual(mgr.ensure_path_created_for_port.call_count, 1)
def test_policy_target_port_deleted_on_apic(self):
ptg = self.create_policy_target_group()['policy_target_group']
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
@@ -165,8 +193,7 @@ class TestPolicyTarget(ApicMappingTestCase):
self.new_delete_request(
'policy_targets', pt['policy_target']['id'],
self.fmt).get_response(self.ext_api)
mgr = self.driver.apic_manager
self.assertEqual(mgr.ensure_path_deleted_for_port.call_count, 1)
self.assertTrue(self.driver.notifier.port_update.called)
def test_policy_target_delete_no_port(self):
ptg = self.create_policy_target_group()['policy_target_group']
@@ -179,46 +206,92 @@ class TestPolicyTarget(ApicMappingTestCase):
self.fmt).get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
self.delete_policy_target(pt['policy_target']['id'],
expected_res_status=204)
expected_res_status=404)
def test_policy_target_port_deleted_on_apic_host_to_host(self):
ptg = self.create_policy_target_group()['policy_target_group']
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
with self.port(subnet=subnet) as port:
# Create EP with bound port
port = self._bind_port_to_host(port['port']['id'], 'h1')
self.create_policy_target(policy_target_group_id=ptg['id'],
port_id=port['port']['id'])
# Change port binding and notify driver
port_up = self._bind_port_to_host(port['port']['id'], 'h2')
self.driver.process_port_changed(context.get_admin_context(),
port['port'], port_up['port'])
mgr = self.driver.apic_manager
# Path created 2 times
self.assertEqual(mgr.ensure_path_created_for_port.call_count,
2)
# Path deleted 1 time
self.assertEqual(mgr.ensure_path_deleted_for_port.call_count, 1)
def test_policy_target_port_not_deleted(self):
# Create 2 EP same PTG same host bound
def test_delete_policy_target_notification_no_apic_network(self):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
pt1 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt1['port_id'], 'h1')
pt2 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt2['port_id'], 'h1')
# Implicit port will be deleted with the PT
self.delete_policy_target(pt1['id'], expected_res_status=204)
# No notification needed
self.assertFalse(self.driver.notifier.port_update.called)
self.driver.notifier.port_update.reset_mock()
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
with self.port(subnet=subnet) as port:
# Create EP with bound port
port = self._bind_port_to_host(port['port']['id'], 'h1')
pt1 = self.create_policy_target(
policy_target_group_id=ptg['id'], port_id=port['port']['id'])
# Explicit port won't be deleted with PT
self.delete_policy_target(pt1['policy_target']['id'],
expected_res_status=204)
# Issue notification for the agent
self.assertTrue(self.driver.notifier.port_update.called)
# Delete EP1
self.new_delete_request('policy_targets', pt1['id'],
self.fmt).get_response(self.ext_api)
# APIC path not deleted
mgr = self.driver.apic_manager
self.assertEqual(mgr.ensure_path_deleted_for_port.call_count, 0)
def test_get_gbp_details(self):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
pt1 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt1['port_id'], 'h1')
mapping = self.driver.get_gbp_details(context.get_admin_context(),
device='tap%s' % pt1['port_id'], host='h1')
self.assertEqual(pt1['port_id'], mapping['port_id'])
self.assertEqual(ptg['id'], mapping['endpoint_group_name'])
self.assertEqual('someid', mapping['vm-name'])
def test_get_gbp_details_shadow(self):
l2p = self.create_l2_policy()['l2_policy']
network = self._get_object('networks', l2p['network_id'], self.api)
with self.subnet(network=network) as sub:
with self.port(subnet=sub) as port:
self._bind_port_to_host(port['port']['id'], 'h1')
mapping = self.driver.get_gbp_details(
context.get_admin_context(),
device='tap%s' % port['port']['id'], host='h1')
self.assertEqual(port['port']['id'], mapping['port_id'])
self.assertEqual(amap.SHADOW_PREFIX + l2p['id'],
mapping['endpoint_group_name'])
def test_explicit_port(self):
with self.network() as net:
with self.subnet(network=net) as sub:
with self.port(subnet=sub) as port:
self._bind_port_to_host(port['port']['id'], 'h1')
l2p = self.create_l2_policy(
network_id=net['network']['id'])['l2_policy']
ptg = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
self.create_policy_target(
port_id=port['port']['id'],
policy_target_group_id=ptg['id'])
self.assertTrue(self.driver.notifier.port_update.called)
def test_port_notified_on_changed_ptg(self):
ptg = self.create_policy_target_group()['policy_target_group']
ptg2 = self.create_policy_target_group(
l2_policy_id=ptg['l2_policy_id'])['policy_target_group']
pt = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt['port_id'], 'h1')
self.driver.notifier.port_update.reset_mock()
self.update_policy_target(pt['id'], policy_target_group_id=ptg2['id'])
self.assertTrue(self.driver.notifier.port_update.called)
def test_update_ptg_failed(self):
ptg = self.create_policy_target_group()['policy_target_group']
ptg2 = self.create_policy_target_group()['policy_target_group']
pt = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
res = self.update_policy_target(
pt['id'], policy_target_group_id=ptg2['id'],
expected_res_status=400)
self.assertEqual('InvalidPortForPTG', res['NeutronError']['type'])
class TestPolicyTargetGroup(ApicMappingTestCase):
@@ -228,9 +301,14 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
name="ptg1", shared=shared)['policy_target_group']
tenant = self.common_tenant if shared else ptg['tenant_id']
mgr = self.driver.apic_manager
mgr.ensure_epg_created.assert_called_once_with(
tenant, ptg['id'], bd_name=ptg['l2_policy_id'],
bd_owner=tenant)
expected_calls = [
mock.call(tenant, ptg['id'], bd_name=ptg['l2_policy_id'],
bd_owner=tenant),
mock.call(tenant, amap.SHADOW_PREFIX + ptg['l2_policy_id'],
bd_name=ptg['l2_policy_id'], bd_owner=tenant,
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.ensure_epg_created.call_args_list)
def test_policy_target_group_created_on_apic(self):
self._test_policy_target_group_created_on_apic()
@@ -241,22 +319,44 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
def _test_ptg_policy_rule_set_created(self, provider=True, shared=False):
cntr = self.create_policy_rule_set(name='c',
shared=shared)['policy_rule_set']
l2p = self.create_l2_policy()['l2_policy']
mgr = self.driver.apic_manager
mgr.set_contract_for_epg.reset_mock()
if provider:
ptg = self.create_policy_target_group(
l2_policy_id=l2p['id'],
provided_policy_rule_sets={cntr['id']: 'scope'})[
'policy_target_group']
else:
ptg = self.create_policy_target_group(
l2_policy_id=l2p['id'],
consumed_policy_rule_sets={cntr['id']: 'scope'})[
'policy_target_group']
# Verify that the apic call is issued
ct_owner = self.common_tenant if shared else cntr['tenant_id']
mgr = self.driver.apic_manager
mgr.set_contract_for_epg.assert_called_with(
ptg['tenant_id'], ptg['id'], cntr['id'], transaction='transaction',
contract_owner=ct_owner, provider=provider)
expected_calls = [
mock.call(
ptg['tenant_id'], ptg['id'], cntr['id'],
transaction=mock.ANY, contract_owner=ct_owner,
provider=provider),
mock.call(
ptg['tenant_id'], ptg['id'],
amap.SERVICE_PREFIX + ptg['l2_policy_id'],
transaction=mock.ANY, contract_owner=ptg['tenant_id'],
provider=False),
mock.call(
ptg['tenant_id'], ptg['id'],
amap.IMPLICIT_PREFIX + ptg['l2_policy_id'],
transaction=mock.ANY, contract_owner=ptg['tenant_id'],
provider=True),
mock.call(
ptg['tenant_id'], ptg['id'],
amap.IMPLICIT_PREFIX + ptg['l2_policy_id'],
transaction=mock.ANY, contract_owner=ptg['tenant_id'],
provider=False)]
self._check_call_list(expected_calls,
mgr.set_contract_for_epg.call_args_list)
def _test_ptg_policy_rule_set_updated(self, provider=True, shared=False):
p_or_c = {True: 'provided_policy_rule_sets',
@@ -284,12 +384,12 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
ct_owner = self.common_tenant if shared else cntr['tenant_id']
mgr.set_contract_for_epg.assert_called_with(
ptg['tenant_id'], ptg['id'], new_cntr['id'],
contract_owner=ct_owner, transaction='transaction',
contract_owner=ct_owner, transaction=mock.ANY,
provider=provider)
mgr.unset_contract_for_epg.assert_called_with(
ptg['tenant_id'], ptg['id'], cntr['id'],
contract_owner=ct_owner,
transaction='transaction', provider=provider)
transaction=mock.ANY, provider=provider)
def test_ptg_policy_rule_set_provider_created(self):
self._test_ptg_policy_rule_set_created()
@@ -316,15 +416,20 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
self._test_ptg_policy_rule_set_updated(False, shared=True)
def _test_policy_target_group_deleted_on_apic(self, shared=False):
ptg = self.create_policy_target_group(name="ptg1",
shared=shared)['policy_target_group']
ptg = self.create_policy_target_group(
name="ptg1", shared=shared)['policy_target_group']
req = self.new_delete_request('policy_target_groups',
ptg['id'], self.fmt)
req.get_response(self.ext_api)
mgr = self.driver.apic_manager
tenant = self.common_tenant if shared else ptg['tenant_id']
mgr.delete_epg_for_network.assert_called_once_with(
tenant, ptg['id'])
expected_calls = [
mock.call(tenant, ptg['id']),
mock.call(tenant, amap.SHADOW_PREFIX + ptg['l2_policy_id'],
transaction=mock.ANY)]
self._check_call_list(expected_calls,
mgr.delete_epg_for_network.call_args_list)
def test_policy_target_group_deleted_on_apic(self):
self._test_policy_target_group_deleted_on_apic()
@@ -340,7 +445,7 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
tenant = self.common_tenant if shared else ptg['tenant_id']
mgr.ensure_subnet_created_on_apic.assert_called_once_with(
tenant, ptg['l2_policy_id'], '10.0.0.1/24',
transaction='transaction')
transaction=mock.ANY)
def test_policy_target_group_subnet_created_on_apic(self):
self._test_policy_target_group_subnet_created_on_apic()
@@ -364,7 +469,7 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
tenant = self.common_tenant if shared else ptg['tenant_id']
mgr.ensure_subnet_created_on_apic.assert_called_with(
tenant, ptg['l2_policy_id'], '10.0.1.1/24',
transaction='transaction')
transaction=mock.ANY)
def test_policy_target_group_subnet_added(self):
self._test_policy_target_group_subnet_added()
@@ -385,10 +490,10 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
tenant = self.common_tenant if shared else ptg['tenant_id']
mgr.ensure_subnet_created_on_apic.assert_called_once_with(
tenant, ptg['l2_policy_id'], '10.0.0.254/24',
transaction='transaction')
transaction=mock.ANY)
mgr.ensure_subnet_deleted_on_apic.assert_called_with(
tenant, ptg['l2_policy_id'], '10.0.0.1/24',
transaction='transaction')
transaction=mock.ANY)
def test_process_subnet_update(self):
self._test_process_subnet_update()
@@ -396,16 +501,66 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
def test_process_subnet_update_shared(self):
self._test_process_subnet_update(shared=True)
def test_multiple_ptg_per_l2p(self):
l2p = self.create_l2_policy()['l2_policy']
# Create first PTG
ptg1 = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
ptg2 = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
self.assertEqual(ptg1['subnets'], ptg2['subnets'])
def test_force_add_subnet(self):
l2p = self.create_l2_policy()['l2_policy']
# Create first PTG
ptg1 = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
ptg2 = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
ctx = p_context.PolicyTargetGroupContext(
self.driver.gbp_plugin, context.get_admin_context(), ptg2)
# Emulate force add
self.driver._use_implicit_subnet(ctx, force_add=True)
# There now a new subnet, and it's added to both the PTGs
self.assertEqual(2, len(ctx.current['subnets']))
ptg1 = self.show_policy_target_group(ptg1['id'])['policy_target_group']
self.assertEqual(2, len(ptg1['subnets']))
ptg2 = self.show_policy_target_group(ptg2['id'])['policy_target_group']
self.assertEqual(2, len(ptg2['subnets']))
self.assertEqual(set(ptg1['subnets']), set(ptg2['subnets']))
self.assertNotEqual(ptg2['subnets'][0], ptg2['subnets'][1])
def test_subnets_unique_per_l3p(self):
l3p = self.create_l3_policy(shared=True, tenant_id='admin',
is_admin_context=True)['l3_policy']
l2p1 = self.create_l2_policy(
tenant_id='hr', l3_policy_id=l3p['id'])['l2_policy']
l2p2 = self.create_l2_policy(
tenant_id='eng', l3_policy_id=l3p['id'])['l2_policy']
ptg1 = self.create_policy_target_group(
tenant_id='hr', l2_policy_id=l2p1['id'])['policy_target_group']
ptg2 = self.create_policy_target_group(
tenant_id='eng', l2_policy_id=l2p2['id'])['policy_target_group']
sub_ptg_1 = set(self._get_object('subnets',
x, self.api)['subnet']['cidr']
for x in ptg1['subnets'])
sub_ptg_2 = set(self._get_object('subnets',
x, self.api)['subnet']['cidr']
for x in ptg2['subnets'])
self.assertNotEqual(sub_ptg_1, sub_ptg_2)
self.assertFalse(sub_ptg_1 & sub_ptg_2)
def _create_explicit_subnet_ptg(self, cidr, shared=False):
l2p = self.create_l2_policy(name="l2p", shared=shared)
l2p_id = l2p['l2_policy']['id']
network_id = l2p['l2_policy']['network_id']
network = self._get_object('networks', network_id, self.api)
with self.subnet(network=network, cidr=cidr) as subnet:
subnet_id = subnet['subnet']['id']
with self.subnet(network=network, cidr=cidr):
# The subnet creation in the proper network causes the subnet ID
# to be added to the PTG
return self.create_policy_target_group(
name="ptg1", l2_policy_id=l2p_id,
subnets=[subnet_id], shared=shared)['policy_target_group']
shared=shared)['policy_target_group']
class TestL2Policy(ApicMappingTestCase):
@@ -416,7 +571,11 @@ class TestL2Policy(ApicMappingTestCase):
tenant = self.common_tenant if shared else l2p['tenant_id']
mgr = self.driver.apic_manager
mgr.ensure_bd_created_on_apic.assert_called_once_with(
tenant, l2p['id'], ctx_owner=tenant, ctx_name=l2p['l3_policy_id'])
tenant, l2p['id'], ctx_owner=tenant, ctx_name=l2p['l3_policy_id'],
transaction=mock.ANY)
mgr.ensure_epg_created.assert_called_once_with(
tenant, amap.SHADOW_PREFIX + l2p['id'], bd_owner=tenant,
bd_name=l2p['id'], transaction=mock.ANY)
def test_l2_policy_created_on_apic(self):
self._test_l2_policy_created_on_apic()
@@ -431,7 +590,17 @@ class TestL2Policy(ApicMappingTestCase):
tenant = self.common_tenant if shared else l2p['tenant_id']
mgr = self.driver.apic_manager
mgr.delete_bd_on_apic.assert_called_once_with(
tenant, l2p['id'])
tenant, l2p['id'], transaction=mock.ANY)
mgr.delete_epg_for_network.assert_called_once_with(
tenant, amap.SHADOW_PREFIX + l2p['id'],
transaction=mock.ANY)
expected_calls = [
mock.call(amap.IMPLICIT_PREFIX + l2p['id'], owner=tenant,
transaction=mock.ANY),
mock.call(amap.SERVICE_PREFIX + l2p['id'], owner=tenant,
transaction=mock.ANY)]
self._check_call_list(expected_calls,
mgr.delete_contract.call_args_list)
def test_l2_policy_deleted_on_apic(self):
self._test_l2_policy_deleted_on_apic()
@@ -439,6 +608,21 @@ class TestL2Policy(ApicMappingTestCase):
def test_l2_policy_deleted_on_apic_shared(self):
self._test_l2_policy_deleted_on_apic(shared=True)
def test_pre_existing_subnets_added(self):
with self.network() as net:
with self.subnet(network=net) as sub:
sub = sub['subnet']
l2p = self.create_l2_policy(
network_id=net['network']['id'])['l2_policy']
mgr = self.driver.apic_manager
mgr.ensure_subnet_created_on_apic.assert_called_with(
l2p['tenant_id'], l2p['id'],
sub['gateway_ip'] + '/' + sub['cidr'].split('/')[1],
transaction=mock.ANY)
ptg = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
self.assertEqual(ptg['subnets'], [sub['id']])
class TestL3Policy(ApicMappingTestCase):
@@ -541,7 +725,7 @@ class TestL3Policy(ApicMappingTestCase):
transaction=mock.ANY)
mgr.ensure_logical_node_profile_created.assert_called_once_with(
es['id'], mocked.APIC_EXT_SWITCH, mocked.APIC_EXT_MODULE,
mocked.APIC_EXT_PORT, mocked.APIC_EXT_ENCAP, '192.168.0.3',
mocked.APIC_EXT_PORT, mocked.APIC_EXT_ENCAP, '192.168.0.3/24',
owner=owner, router_id=APIC_EXTERNAL_RID,
transaction=mock.ANY)
@@ -596,7 +780,7 @@ class TestL3Policy(ApicMappingTestCase):
transaction=mock.ANY)
mgr.ensure_logical_node_profile_created.assert_called_once_with(
es['id'], mocked.APIC_EXT_SWITCH, mocked.APIC_EXT_MODULE,
mocked.APIC_EXT_PORT, mocked.APIC_EXT_ENCAP, '192.168.0.3',
mocked.APIC_EXT_PORT, mocked.APIC_EXT_ENCAP, '192.168.0.3/24',
owner=owner, router_id=APIC_EXTERNAL_RID,
transaction=mock.ANY)
@@ -718,7 +902,7 @@ class TestL3Policy(ApicMappingTestCase):
transaction=mock.ANY)
mgr.ensure_logical_node_profile_created.assert_called_once_with(
es2['id'], mocked.APIC_EXT_SWITCH, mocked.APIC_EXT_MODULE,
mocked.APIC_EXT_PORT, mocked.APIC_EXT_ENCAP, '192.168.1.3',
mocked.APIC_EXT_PORT, mocked.APIC_EXT_ENCAP, '192.168.1.3/24',
owner=owner, router_id=APIC_EXTERNAL_RID,
transaction=mock.ANY)
self.assertFalse(mgr.ensure_static_route_created.called)
@@ -806,7 +990,7 @@ class TestPolicyRuleSet(ApicMappingTestCase):
tenant = self.common_tenant if shared else ct['tenant_id']
mgr = self.driver.apic_manager
mgr.create_contract.assert_called_once_with(
ct['id'], owner=tenant, transaction='transaction')
ct['id'], owner=tenant, transaction=mock.ANY)
def test_policy_rule_set_created_on_apic(self):
self._test_policy_rule_set_created_on_apic()
@@ -825,26 +1009,49 @@ class TestPolicyRuleSet(ApicMappingTestCase):
rule_owner = self.common_tenant if shared else rules[0]['tenant_id']
# Verify that the in-out rules are correctly enforced on the APIC
mgr = self.driver.apic_manager
mgr.manage_contract_subject_in_filter.assert_called_once_with(
ctr['id'], ctr['id'], rules[in_d]['id'], owner=ctr['tenant_id'],
transaction='transaction', unset=False,
rule_owner=rule_owner)
mgr.manage_contract_subject_out_filter.assert_called_once_with(
ctr['id'], ctr['id'], rules[out]['id'], owner=ctr['tenant_id'],
transaction='transaction', unset=False,
rule_owner=rule_owner)
expected_calls = [
mock.call(ctr['id'], ctr['id'], rules[in_d]['id'],
owner=ctr['tenant_id'], transaction=mock.ANY,
unset=False, rule_owner=rule_owner),
mock.call(ctr['id'], ctr['id'],
amap.REVERSE_PREFIX + rules[out]['id'],
owner=ctr['tenant_id'], transaction=mock.ANY,
unset=False, rule_owner=rule_owner)]
self._check_call_list(
expected_calls,
mgr.manage_contract_subject_in_filter.call_args_list)
expected_calls = [
mock.call(ctr['id'], ctr['id'], rules[out]['id'],
owner=ctr['tenant_id'], transaction=mock.ANY,
unset=False, rule_owner=rule_owner),
mock.call(ctr['id'], ctr['id'],
amap.REVERSE_PREFIX + rules[in_d]['id'],
owner=ctr['tenant_id'], transaction=mock.ANY,
unset=False, rule_owner=rule_owner)]
self._check_call_list(
expected_calls,
mgr.manage_contract_subject_out_filter.call_args_list)
# Create policy_rule_set with BI rule
ctr = self.create_policy_rule_set(
name="ctr", policy_rules=[rules[bi]['id']])['policy_rule_set']
mgr.manage_contract_subject_in_filter.assert_called_with(
mgr.manage_contract_subject_in_filter.call_happened_with(
ctr['id'], ctr['id'], rules[bi]['id'], owner=ctr['tenant_id'],
transaction='transaction', unset=False,
transaction=mock.ANY, unset=False,
rule_owner=rule_owner)
mgr.manage_contract_subject_out_filter.assert_called_with(
mgr.manage_contract_subject_out_filter.call_happened_with(
ctr['id'], ctr['id'], rules[bi]['id'], owner=ctr['tenant_id'],
transaction='transaction', unset=False,
transaction=mock.ANY, unset=False,
rule_owner=rule_owner)
mgr.manage_contract_subject_in_filter.call_happened_with(
ctr['id'], ctr['id'], amap.REVERSE_PREFIX + rules[bi]['id'],
owner=ctr['tenant_id'], transaction=mock.ANY, unset=False,
rule_owner=rule_owner)
mgr.manage_contract_subject_out_filter.call_happened_with(
ctr['id'], ctr['id'], amap.REVERSE_PREFIX + rules[bi]['id'],
owner=ctr['tenant_id'], transaction=mock.ANY, unset=False,
rule_owner=rule_owner)
def test_policy_rule_set_created_with_rules(self):
@@ -950,13 +1157,25 @@ class TestPolicyRuleSet(ApicMappingTestCase):
class TestPolicyRule(ApicMappingTestCase):
def _test_policy_rule_created_on_apic(self, shared=False):
pr = self._create_simple_policy_rule('in', 'udp', 88, shared=shared)
pr = self._create_simple_policy_rule('in', 'tcp', 88, shared=shared)
tenant = self.common_tenant if shared else pr['tenant_id']
mgr = self.driver.apic_manager
mgr.create_tenant_filter.assert_called_once_with(
pr['id'], owner=tenant, etherT='ip', prot='udp',
dToPort=88, dFromPort=88)
expected_calls = [
mock.call(pr['id'], owner=tenant, etherT='ip', prot='tcp',
dToPort=88, dFromPort=88, transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr['id'], owner=tenant,
etherT='ip', prot='tcp', sToPort=88, sFromPort=88,
tcpRules='est', transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.create_tenant_filter.call_args_list)
mgr.reset_mock()
pr = self._create_simple_policy_rule('bi', None, None, shared=shared)
expected_calls = [
mock.call(pr['id'], owner=tenant, etherT='unspecified',
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.create_tenant_filter.call_args_list)
def test_policy_rule_created_on_apic(self):
self._test_policy_rule_created_on_apic()
@@ -981,8 +1200,12 @@ class TestPolicyRule(ApicMappingTestCase):
tenant = self.common_tenant if shared else pr['tenant_id']
mgr = self.driver.apic_manager
mgr.delete_tenant_filter.assert_called_once_with(
pr['id'], owner=tenant)
expected_calls = [
mock.call(pr['id'], owner=tenant, transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr['id'], owner=tenant,
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.delete_tenant_filter.call_args_list)
def test_policy_rule_deleted_on_apic(self):
self._test_policy_rule_deleted_on_apic()
@@ -990,17 +1213,137 @@ class TestPolicyRule(ApicMappingTestCase):
def test_policy_rule_deleted_on_apic_shared(self):
self._test_policy_rule_deleted_on_apic(shared=True)
def _create_simple_policy_rule(self, direction='bi', protocol='tcp',
port_range=80, shared=False):
cls = self.create_policy_classifier(
direction=direction, protocol=protocol,
port_range=port_range, shared=shared)['policy_classifier']
def test_policy_classifier_updated(self):
pa = self.create_policy_action(
action_type='allow', is_admin_context=True,
tenant_id='admin', shared=True)['policy_action']
pc = self.create_policy_classifier(
direction='in', protocol='udp', port_range=80,
shared=True, is_admin_context=True,
tenant_id='admin')['policy_classifier']
pr1 = self.create_policy_rule(
policy_classifier_id=pc['id'], policy_actions=[pa['id']],
shared=True, is_admin_context=True,
tenant_id='admin')['policy_rule']
pr2 = self.create_policy_rule(policy_classifier_id=pc['id'],
policy_actions=[pa['id']])['policy_rule']
prs1 = self.create_policy_rule_set(
policy_rules=[pr1['id']])['policy_rule_set']
prs2 = self.create_policy_rule_set(
policy_rules=[pr2['id'], pr1['id']])['policy_rule_set']
action = self.create_policy_action(
action_type='allow', shared=shared)['policy_action']
return self.create_policy_rule(
policy_classifier_id=cls['id'], policy_actions=[action['id']],
shared=shared)['policy_rule']
mgr = self.driver.apic_manager
mgr.reset_mock()
# Remove Classifier port, should just delete and create the filter
self.update_policy_classifier(pc['id'], port_range=None,
is_admin_context=True)
expected_calls = [
mock.call(pr1['id'], owner='common', etherT='ip', prot='udp',
transaction=mock.ANY),
mock.call(pr2['id'], owner='test-tenant', etherT='ip', prot='udp',
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.create_tenant_filter.call_args_list)
expected_calls = [
mock.call(pr1['id'], owner='common', transaction=mock.ANY),
mock.call(pr2['id'], owner='test-tenant', transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr1['id'], owner='common',
transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr2['id'], owner='test-tenant',
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.delete_tenant_filter.call_args_list)
self.assertFalse(mgr.manage_contract_subject_in_filter.called)
self.assertFalse(mgr.manage_contract_subject_out_filter.called)
mgr.reset_mock()
# Change Classifier protocol, to not revertible
self.update_policy_classifier(pc['id'], protocol='icmp',
is_admin_context=True)
expected_calls = [
mock.call(pr1['id'], owner='common', etherT='ip', prot='icmp',
transaction=mock.ANY),
mock.call(pr2['id'], owner='test-tenant', etherT='ip', prot='icmp',
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.create_tenant_filter.call_args_list)
expected_calls = [
mock.call(pr1['id'], owner='common', transaction=mock.ANY),
mock.call(pr2['id'], owner='test-tenant', transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr1['id'], owner='common',
transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr2['id'], owner='test-tenant',
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.delete_tenant_filter.call_args_list)
self.assertFalse(mgr.manage_contract_subject_in_filter.called)
self.assertFalse(mgr.manage_contract_subject_out_filter.called)
mgr.reset_mock()
# Change Classifier protocol to revertible
self.update_policy_classifier(pc['id'], protocol='tcp',
is_admin_context=True)
expected_calls = [
mock.call(pr1['id'], owner='common', transaction=mock.ANY),
mock.call(pr2['id'], owner='test-tenant', transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr1['id'], owner='common',
transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr2['id'], owner='test-tenant',
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.delete_tenant_filter.call_args_list)
expected_calls = [
mock.call(pr1['id'], owner='common', etherT='ip', prot='tcp',
transaction=mock.ANY),
mock.call(pr2['id'], owner='test-tenant', etherT='ip', prot='tcp',
transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr1['id'], owner='common',
etherT='ip', prot='tcp', tcpRules='est',
transaction=mock.ANY),
mock.call(amap.REVERSE_PREFIX + pr2['id'], owner='test-tenant',
etherT='ip', prot='tcp', tcpRules='est',
transaction=mock.ANY)]
self._check_call_list(
expected_calls, mgr.create_tenant_filter.call_args_list)
expected_calls = [
# Unset PR1 and PR2 IN
mock.call(prs1['id'], prs1['id'], pr1['id'], owner='test-tenant',
transaction=mock.ANY, unset=True, rule_owner='common'),
mock.call(prs2['id'], prs2['id'], pr1['id'], owner='test-tenant',
transaction=mock.ANY, unset=True, rule_owner='common'),
mock.call(prs2['id'], prs2['id'], pr2['id'], owner='test-tenant',
transaction=mock.ANY, unset=True,
rule_owner='test-tenant'),
# SET PR1 and PR2 IN
mock.call(prs1['id'], prs1['id'], pr1['id'], owner='test-tenant',
transaction=mock.ANY, unset=False, rule_owner='common'),
mock.call(prs2['id'], prs2['id'], pr1['id'], owner='test-tenant',
transaction=mock.ANY, unset=False, rule_owner='common'),
mock.call(prs2['id'], prs2['id'], pr2['id'], owner='test-tenant',
transaction=mock.ANY, unset=False,
rule_owner='test-tenant')
]
self._check_call_list(
expected_calls,
mgr.manage_contract_subject_in_filter.call_args_list)
# SET Reverse PR1 and PR2 OUT
expected_calls = [
mock.call(prs1['id'], prs1['id'], amap.REVERSE_PREFIX + pr1['id'],
owner='test-tenant', transaction=mock.ANY, unset=False,
rule_owner='common'),
mock.call(prs2['id'], prs2['id'], amap.REVERSE_PREFIX + pr1['id'],
owner='test-tenant', transaction=mock.ANY, unset=False,
rule_owner='common'),
mock.call(prs2['id'], prs2['id'], amap.REVERSE_PREFIX + pr2['id'],
owner='test-tenant', transaction=mock.ANY, unset=False,
rule_owner='test-tenant')
]
self._check_call_list(
expected_calls,
mgr.manage_contract_subject_out_filter.call_args_list)
class TestExternalSegment(ApicMappingTestCase):

View File

@@ -49,12 +49,10 @@ class FakeDriver(object):
class GroupPolicyPluginTestCase(tgpmdb.GroupPolicyMappingDbTestCase):
def setUp(self, core_plugin=None, gp_plugin=None):
def setUp(self, core_plugin=None, gp_plugin=None, ml2_options=None):
if not gp_plugin:
gp_plugin = GP_PLUGIN_KLASS
ml2_opts = {
'mechanism_drivers': ['openvswitch'],
}
ml2_opts = ml2_options or {'mechanism_drivers': ['openvswitch']}
for opt, val in ml2_opts.items():
cfg.CONF.set_override(opt, val, 'ml2')
core_plugin = core_plugin or test_plugin.PLUGIN_NAME

View File

@@ -61,7 +61,8 @@ CORE_PLUGIN = ('gbpservice.neutron.tests.unit.services.grouppolicy.'
class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
def setUp(self, policy_drivers=None):
def setUp(self, policy_drivers=None,
core_plugin=n_test_plugin.PLUGIN_NAME, ml2_options=None):
policy_drivers = policy_drivers or ['implicit_policy',
'resource_mapping']
config.cfg.CONF.set_override('policy_drivers',
@@ -71,8 +72,8 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
['dummy'],
group='servicechain')
config.cfg.CONF.set_override('allow_overlapping_ips', True)
super(ResourceMappingTestCase, self).setUp(
core_plugin=n_test_plugin.PLUGIN_NAME)
super(ResourceMappingTestCase, self).setUp(core_plugin=core_plugin,
ml2_options=ml2_options)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
res = mock.patch('neutron.db.l3_db.L3_NAT_dbonly_mixin.'
@@ -323,6 +324,37 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
filter_by(policy_target_group_id=ptg_id).
all())
def _create_service_profile(self, node_type='LOADBALANCER'):
data = {'service_profile': {'service_type': node_type,
'tenant_id': self._tenant_id}}
scn_req = self.new_create_request(SERVICE_PROFILES, data, self.fmt)
node = self.deserialize(self.fmt, scn_req.get_response(self.ext_api))
scn_id = node['service_profile']['id']
return scn_id
def _create_servicechain_node(self, node_type="LOADBALANCER"):
profile_id = self._create_service_profile(node_type)
data = {'servicechain_node': {'service_profile_id': profile_id,
'tenant_id': self._tenant_id,
'config': "{}"}}
scn_req = self.new_create_request(SERVICECHAIN_NODES, data, self.fmt)
node = self.deserialize(self.fmt, scn_req.get_response(self.ext_api))
scn_id = node['servicechain_node']['id']
return scn_id
def _create_servicechain_spec(self, node_types=[]):
if not node_types:
node_types = ['LOADBALANCER']
node_ids = []
for node_type in node_types:
node_ids.append(self._create_servicechain_node(node_type))
data = {'servicechain_spec': {'tenant_id': self._tenant_id,
'nodes': node_ids}}
scs_req = self.new_create_request(SERVICECHAIN_SPECS, data, self.fmt)
spec = self.deserialize(self.fmt, scs_req.get_response(self.ext_api))
scs_id = spec['servicechain_spec']['id']
return scs_id
class TestPolicyTarget(ResourceMappingTestCase):
@@ -1480,37 +1512,6 @@ class TestPolicyRuleSet(ResourceMappingTestCase):
self._verify_prs_rules(policy_rule_set_id)
def _create_service_profile(self, node_type='LOADBALANCER'):
data = {'service_profile': {'service_type': node_type,
'tenant_id': self._tenant_id}}
scn_req = self.new_create_request(SERVICE_PROFILES, data, self.fmt)
node = self.deserialize(self.fmt, scn_req.get_response(self.ext_api))
scn_id = node['service_profile']['id']
return scn_id
def _create_servicechain_node(self, node_type="LOADBALANCER"):
profile_id = self._create_service_profile(node_type)
data = {'servicechain_node': {'service_profile_id': profile_id,
'tenant_id': self._tenant_id,
'config': "{}"}}
scn_req = self.new_create_request(SERVICECHAIN_NODES, data, self.fmt)
node = self.deserialize(self.fmt, scn_req.get_response(self.ext_api))
scn_id = node['servicechain_node']['id']
return scn_id
def _create_servicechain_spec(self, node_types=[]):
if not node_types:
node_types = ['LOADBALANCER']
node_ids = []
for node_type in node_types:
node_ids.append(self._create_servicechain_node(node_type))
data = {'servicechain_spec': {'tenant_id': self._tenant_id,
'nodes': node_ids}}
scs_req = self.new_create_request(SERVICECHAIN_SPECS, data, self.fmt)
spec = self.deserialize(self.fmt, scs_req.get_response(self.ext_api))
scs_id = spec['servicechain_spec']['id']
return scs_id
def _create_provider_consumer_ptgs(self, prs_id=None):
policy_rule_set_dict = {prs_id: None} if prs_id else {}
provider_ptg = self.create_policy_target_group(

View File

@@ -1,4 +1,5 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
oslosphinx>=2.5.0,<2.6.0 # Apache-2.0

View File

@@ -20,3 +20,5 @@ requests-mock>=0.5.1 # Apache-2.0
testrepository>=0.0.18
testtools>=0.9.36,!=1.2.0
WebTest>=2.0
-e git+https://github.com/noironetworks/python-opflex-agent.git#egg=opflexagent